Throttle pandas apply, when using an API call
Asked Answered
N

1

1

I have a large DataFrame with an address column:

      data   addr
0  0.617964  IN,Krishnagiri,635115
1  0.635428  IN,Chennai,600005
2  0.630125  IN,Karnal,132001
3  0.981282  IN,Jaipur,302021
4  0.715813  IN,Chennai,600005
...

and I've written the following function to replace the address with the longitude and latitude coordinates of the address:

from geopy.geocoders import Nominatim
geo_locator = Nominatim(user_agent="MY_APP_ID")

def get_coordinates(addr):
    location = geo_locator.geocode(addr)
    if location is not None:
        return pd.Series({'lat': location.latitude, 'lon': location.longitude})
    location = geo_locator.geocode(addr.split(',')[0])
    if location is not None:
        return pd.Series({'lat': location.latitude, 'lon': location.longitude})
    return pd.Series({'lat': -1, 'lon': -1})

Then calling pandas apply method on the address column, and concatinating the result to the end of the DF instead of the address column:

df = pd.concat([df, df.addr.apply(get_coordinates)], axis=1).drop(['addr'], axis=1)

However, since the get_coordinates calls a 3rd party API it fails on: geopy.exc.GeocoderTimedOut: Service timed out

How do I throttle the requests to make sure I got a response before continuing to the next value?

Update:
For further improvements, I would like to call the API only on unique values, i.e: if the address IN,Krishnagiri,635115 appears 20 times in my DataFrame, I would like to call it only once and apply the results to all 20 occurrences.

Update 2:
Log + Stack trace, for @Andrew Lavers code:

...
Fetched Gandipet, Khanapur, Rangareddy District, Telangana, 500075, India
Fetched Jaipur Municipal Corporation, Jaipur, Rajasthan, 302015, India
Fetched Chennai, Chennai district, Tamil Nadu, India
Exception from geolocator: Fake exception for testing
Backing off for 1 seconds.
Exception from geolocator: Fake exception for testing
Backing off for 3 seconds.
Fetched None
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/geopy/geocoders/base.py", line 344, in _call_geocoder
    page = requester(req, timeout=timeout, **kwargs)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 526, in open
    response = self._open(req, data)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 544, in _open
    '_open', req)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 504, in _call_chain
    result = func(*args)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 1361, in https_open
    context=self._context, check_hostname=self._check_hostname)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 1321, in do_open
    r = h.getresponse()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1331, in getresponse
    response.begin()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 297, in begin
    version, status, reason = self._read_status()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 258, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/ssl.py", line 1002, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/ssl.py", line 865, in read
    return self._sslobj.read(len, buffer)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/ssl.py", line 625, in read
    v = self._sslobj.read(len, buffer)
socket.timeout: The read operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/...//tmp.py", line 89, in <module>
    df.addr.apply(get_coordinates)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pandas/core/series.py", line 3194, in apply
    mapped = lib.map_infer(values, f, convert=convert_dtype)
  File "pandas/_libs/src/inference.pyx", line 1472, in pandas._libs.lib.map_infer
  File "/Users/...//tmp.py", line 76, in get_coordinates
    location = geo_locator.geocode(addr.split(',')[0])
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/geopy/geocoders/osm.py", line 307, in geocode
    self._call_geocoder(url, timeout=timeout), exactly_one
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/geopy/geocoders/base.py", line 371, in _call_geocoder
    raise GeocoderTimedOut('Service timed out')
geopy.exc.GeocoderTimedOut: Service timed out

Process finished with exit code 1
Ninette answered 25/11, 2018 at 9:39 Comment(2)
A time.sleep() after every call?Conyers
Thanks for the reply, it seems like there must be a more robust solutionNinette
S
4

Here is some tested code that may help. 1) Simple rate limiting to what the Api specifies (Nominatum appears to be 1 per second but i got success as low as 0.1 seconds). 2) Simple result caching in a dictionary, controllable by parameter for testing 3) Retry loop with multiplicative slowdown and linear speedup. (slows down fast, speeds up more slowly) 4) Test exception for faking errors

I cannot replicate the issues you are experiencing - likely due to your path to the API.

A more robust strategy may to build a local persistence cache and continue to retry until the full batch is built. The cache could be a pandas dataframe written as csv to file. The overall pseudo code is something like.

repeat until all addresses are in the cache
    cache = pd.read_csv("cache.csv)
    addressess_to_get = addresses in df that are not in cache
    for batch of n addresses in addresses_to_get:
       cache.add(get_location(addr))
    cache.write_csv("cache.csv")

Here is the tested code

import datetime
import time

import pandas as pd
from geopy.geocoders import Nominatim
geo_locator = Nominatim(user_agent="[email protected]")


# Define the rate limit function and associated global variable

last_time = datetime.datetime.now()
backoff_time = 0

def rate_limit(min_interval_seconds = .1):
    global last_time
    sleep = min_interval_seconds - (datetime.datetime.now() - last_time).total_seconds() 
    if sleep > 0 :
        print(f'Sleeping for {sleep} seconds')
        time.sleep(sleep)
    last_time = datetime.datetime.now()

# make a cache dictionary keyed by address 
geo_cache = {}
backoff_seconds = 0

def get_coordinates_with_retry(addr):

    # Return coords from global cache if it exists
    global backoff_seconds


    # set the backoff intital values and factors
    max_backoff_seconds = 60
    backoff_exponential = 2
    backoff_linear = 2

    # rate limited API call
    rate_limit()

    # Retry until max_back_seconds is reached

    while backoff_seconds < max_backoff_seconds:   # backoff up to this time
        if backoff_seconds > 0:
            print(f"Backing off for {backoff_seconds} seconds.")
            time.sleep(backoff_seconds)
        try:
            location = geo_locator.geocode(addr)

            # REMOVE THIS: fake an error for testing
            #import random
            #if random.random() < .3:
            #    raise(Exception("Fake exception for testing"))

            # Success - so reduce the backoff linearly
            print (f"Fetched {location} for address {addr}")
            backoff_seconds = backoff_seconds - backoff_linear if backoff_seconds > backoff_linear else 0
            break

        except Exception as e:
             print(f"Exception from geolocator: {e}")
             # Backoff exponentially 
             backoff_seconds = 1 + backoff_seconds * backoff_exponential

    if backoff_seconds > max_backoff_seconds:
        raise Exception("Max backoff reached\n")

    return(location)

def get_coordinates(addr, useCache = True):

    # Return from cache if previously loaded
    global geo_cache
    if addr in geo_cache:
        return  geo_cache[addr]

    # Attempt using the full address
    location = get_coordinates_with_retry(addr)

    # Attempt using the first part only if None found
    if location is not None:
        result = pd.Series({'lat': location.latitude, 'lon': location.longitude})
    else :
        print (f"Trying split address for address {addr}")
        location = get_coordinates_with_retry(addr.split(',')[0])
        if location is not None:
            result =  pd.Series({'lat': location.latitude, 'lon': location.longitude})
        else:
            result = pd.Series({'lat': -1, 'lon': -1})

    # assign to cache
    if useCache:
        geo_cache[addr] = result
    return(result)

# Use the test data

df = pd.DataFrame({'addr' : [
'IN,Krishnagiri,635115',  
'IN,Chennai,600005',
'IN,Karnal,132001',
'IN,Jaipur,302021',
'IN,Chennai,600005']})

# repeat the test data to make alarger set 

df = pd.concat([df, df, df, df, df, df, df, df, df, df])

df.addr.apply(get_coordinates)
print(f"Address cache contains {len(geo_cache)} address locations.")
Sarpedon answered 25/11, 2018 at 12:5 Comment(5)
Thanks for your reply, but I still get geopy.exc.GeocoderTimedOut: Service timed out if I'm using your code with df.addr.apply(get_coordinates)Ninette
@ShlomiSchwartz, edited to use df.addr.apply with sample output, which works for me. I suggest you investigate a single execution of the API first to prove that any signle call works.Sarpedon
It works well on small sets, but still throws the exception when handling large DFNinette
@ShlomiSchwartz, edited to include retry with backoff. I suspect your issues have more to do with reliable path to the API rather than the API itself. If there are choices of different worldwide endpoints you may want to try anotherSarpedon
@ShlomiSchwartz, the error occurred in the second call to geolocator for the split address. I have revised the code to ensure that every call is retryable, the fake exception for testing is removed, and the cache is enabledSarpedon

© 2022 - 2024 — McMap. All rights reserved.