Throttle pandas apply, when using an API call

2019-01-25 00:28发布

I have a large DataFrame with an address column:

      data   addr
0  0.617964  IN,Krishnagiri,635115
1  0.635428  IN,Chennai,600005
2  0.630125  IN,Karnal,132001
3  0.981282  IN,Jaipur,302021
4  0.715813  IN,Chennai,600005
...

and I've written the following function to replace the address with the longitude and latitude coordinates of the address:

from geopy.geocoders import Nominatim
geo_locator = Nominatim(user_agent="MY_APP_ID")

def get_coordinates(addr):
    location = geo_locator.geocode(addr)
    if location is not None:
        return pd.Series({'lat': location.latitude, 'lon': location.longitude})
    location = geo_locator.geocode(addr.split(',')[0])
    if location is not None:
        return pd.Series({'lat': location.latitude, 'lon': location.longitude})
    return pd.Series({'lat': -1, 'lon': -1})

Then calling pandas apply method on the address column, and concatinating the result to the end of the DF instead of the address column:

df = pd.concat([df, df.addr.apply(get_coordinates)], axis=1).drop(['addr'], axis=1)

However, since the get_coordinates calls a 3rd party API it fails on: geopy.exc.GeocoderTimedOut: Service timed out

How do I throttle the requests to make sure I got a response before continuing to the next value?

Update:
For further improvements, I would like to call the API only on unique values, i.e: if the address IN,Krishnagiri,635115 appears 20 times in my DataFrame, I would like to call it only once and apply the results to all 20 occurrences.

Update 2:
Log + Stack trace, for @Andrew Lavers code:

...
Fetched Gandipet, Khanapur, Rangareddy District, Telangana, 500075, India
Fetched Jaipur Municipal Corporation, Jaipur, Rajasthan, 302015, India
Fetched Chennai, Chennai district, Tamil Nadu, India
Exception from geolocator: Fake exception for testing
Backing off for 1 seconds.
Exception from geolocator: Fake exception for testing
Backing off for 3 seconds.
Fetched None
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/geopy/geocoders/base.py", line 344, in _call_geocoder
    page = requester(req, timeout=timeout, **kwargs)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 526, in open
    response = self._open(req, data)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 544, in _open
    '_open', req)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 504, in _call_chain
    result = func(*args)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 1361, in https_open
    context=self._context, check_hostname=self._check_hostname)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/urllib/request.py", line 1321, in do_open
    r = h.getresponse()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1331, in getresponse
    response.begin()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 297, in begin
    version, status, reason = self._read_status()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 258, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/ssl.py", line 1002, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/ssl.py", line 865, in read
    return self._sslobj.read(len, buffer)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/ssl.py", line 625, in read
    v = self._sslobj.read(len, buffer)
socket.timeout: The read operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/...//tmp.py", line 89, in <module>
    df.addr.apply(get_coordinates)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pandas/core/series.py", line 3194, in apply
    mapped = lib.map_infer(values, f, convert=convert_dtype)
  File "pandas/_libs/src/inference.pyx", line 1472, in pandas._libs.lib.map_infer
  File "/Users/...//tmp.py", line 76, in get_coordinates
    location = geo_locator.geocode(addr.split(',')[0])
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/geopy/geocoders/osm.py", line 307, in geocode
    self._call_geocoder(url, timeout=timeout), exactly_one
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/geopy/geocoders/base.py", line 371, in _call_geocoder
    raise GeocoderTimedOut('Service timed out')
geopy.exc.GeocoderTimedOut: Service timed out

Process finished with exit code 1

1条回答
疯言疯语
2楼-- · 2019-01-25 01:23

Here is some tested code that may help. 1) Simple rate limiting to what the Api specifies (Nominatum appears to be 1 per second but i got success as low as 0.1 seconds). 2) Simple result caching in a dictionary, controllable by parameter for testing 3) Retry loop with multiplicative slowdown and linear speedup. (slows down fast, speeds up more slowly) 4) Test exception for faking errors

I cannot replicate the issues you are experiencing - likely due to your path to the API.

A more robust strategy may to build a local persistence cache and continue to retry until the full batch is built. The cache could be a pandas dataframe written as csv to file. The overall pseudo code is something like.

repeat until all addresses are in the cache
    cache = pd.read_csv("cache.csv)
    addressess_to_get = addresses in df that are not in cache
    for batch of n addresses in addresses_to_get:
       cache.add(get_location(addr))
    cache.write_csv("cache.csv")

Here is the tested code

import datetime
import time

import pandas as pd
from geopy.geocoders import Nominatim
geo_locator = Nominatim(user_agent="notarealemail@gmail.com")


# Define the rate limit function and associated global variable

last_time = datetime.datetime.now()
backoff_time = 0

def rate_limit(min_interval_seconds = .1):
    global last_time
    sleep = min_interval_seconds - (datetime.datetime.now() - last_time).total_seconds() 
    if sleep > 0 :
        print(f'Sleeping for {sleep} seconds')
        time.sleep(sleep)
    last_time = datetime.datetime.now()

# make a cache dictionary keyed by address 
geo_cache = {}
backoff_seconds = 0

def get_coordinates_with_retry(addr):

    # Return coords from global cache if it exists
    global backoff_seconds


    # set the backoff intital values and factors
    max_backoff_seconds = 60
    backoff_exponential = 2
    backoff_linear = 2

    # rate limited API call
    rate_limit()

    # Retry until max_back_seconds is reached

    while backoff_seconds < max_backoff_seconds:   # backoff up to this time
        if backoff_seconds > 0:
            print(f"Backing off for {backoff_seconds} seconds.")
            time.sleep(backoff_seconds)
        try:
            location = geo_locator.geocode(addr)

            # REMOVE THIS: fake an error for testing
            #import random
            #if random.random() < .3:
            #    raise(Exception("Fake exception for testing"))

            # Success - so reduce the backoff linearly
            print (f"Fetched {location} for address {addr}")
            backoff_seconds = backoff_seconds - backoff_linear if backoff_seconds > backoff_linear else 0
            break

        except Exception as e:
             print(f"Exception from geolocator: {e}")
             # Backoff exponentially 
             backoff_seconds = 1 + backoff_seconds * backoff_exponential

    if backoff_seconds > max_backoff_seconds:
        raise Exception("Max backoff reached\n")

    return(location)

def get_coordinates(addr, useCache = True):

    # Return from cache if previously loaded
    global geo_cache
    if addr in geo_cache:
        return  geo_cache[addr]

    # Attempt using the full address
    location = get_coordinates_with_retry(addr)

    # Attempt using the first part only if None found
    if location is not None:
        result = pd.Series({'lat': location.latitude, 'lon': location.longitude})
    else :
        print (f"Trying split address for address {addr}")
        location = get_coordinates_with_retry(addr.split(',')[0])
        if location is not None:
            result =  pd.Series({'lat': location.latitude, 'lon': location.longitude})
        else:
            result = pd.Series({'lat': -1, 'lon': -1})

    # assign to cache
    if useCache:
        geo_cache[addr] = result
    return(result)

# Use the test data

df = pd.DataFrame({'addr' : [
'IN,Krishnagiri,635115',  
'IN,Chennai,600005',
'IN,Karnal,132001',
'IN,Jaipur,302021',
'IN,Chennai,600005']})

# repeat the test data to make alarger set 

df = pd.concat([df, df, df, df, df, df, df, df, df, df])

df.addr.apply(get_coordinates)
print(f"Address cache contains {len(geo_cache)} address locations.")
查看更多
登录 后发表回答