I have a list
of awaitables
that I want to pass to the asyncio.AbstractEventLoop
but I need to throttle the requests to a third party API.
I would like to avoid something that waits to pass the future
to the loop because in the meantime I block my loop waiting. What options do I have? Semaphores
and ThreadPools
will limit how many are running concurrently, but that's not my problem. I need to throttle my requests to 100/sec, but it doesn't matter how long it takes to complete the request.
This is a very concise (non)working example using the standard library, that demonstrates the problem. This is supposed to throttle at 100/sec but throttles at 116.651/sec. What's the best way to throttle the scheduling of an asynchronous request in asyncio?
Working code:
import asyncio
from threading import Lock
class PTBNL:
def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.token_bucket = TokenBucket()
self.token_bucket.set_rate(100)
def run(self, *awaitables):
loop = asyncio.get_event_loop()
if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)
def sleep(self, secs) -> True:
self.run(asyncio.sleep(secs))
return True
def get_req_id(self) -> int:
new_id = self._req_id_seq
self._req_id_seq += 1
return new_id
def start_req(self, key):
loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future
def end_req(self, key, result=None):
future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)
def req_data(self, req_id, obj):
# Do Some Work Here
self.req_data_end(req_id)
pass
def req_data_end(self, req_id):
print(req_id, " has ended")
self.end_req(req_id)
async def req_data_async(self, obj):
req_id = self.get_req_id()
future = self.start_req(req_id)
self.req_data(req_id, obj)
await future
return future.result()
async def req_data_batch_async(self, contracts):
futures = []
FLAG = False
for contract in contracts:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)
nap = self.token_bucket.consume(1)
if FLAG is False:
FLAG = True
start = asyncio.get_event_loop().time()
asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract)
await asyncio.gather(*futures)
elapsed = asyncio.get_event_loop().time() - start
return futures, len(contracts)/elapsed
class TokenBucket:
def __init__(self):
self.tokens = 0
self.rate = 0
self.last = asyncio.get_event_loop().time()
self.lock = Lock()
def set_rate(self, rate):
with self.lock:
self.rate = rate
self.tokens = self.rate
def consume(self, tokens):
with self.lock:
if not self.rate:
return 0
now = asyncio.get_event_loop().time()
lapse = now - self.last
self.last = now
self.tokens += lapse * self.rate
if self.tokens > self.rate:
self.tokens = self.rate
self.tokens -= tokens
if self.tokens >= 0:
return 0
else:
return -self.tokens / self.rate
if __name__ == '__main__':
asyncio.get_event_loop().set_debug(True)
app = PTBNL()
objs = [obj for obj in range(500)]
l,t = app.run(app.req_data_batch_async(objs))
print(l)
print(t)
Edit: I've added a simple example of TrottleTestApp
here using semaphores, but still can't throttle the execution:
import asyncio
import time
class ThrottleTestApp:
def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.sem = asyncio.Semaphore()
async def allow_requests(self, sem):
"""Permit 100 requests per second; call
loop.create_task(allow_requests())
at the beginning of the program to start this routine. That call returns
a task handle that can be canceled to end this routine.
asyncio.Semaphore doesn't give us a great way to get at the value other
than accessing sem._value. We do that here, but creating a wrapper that
adds a current_value method would make this cleaner"""
while True:
while sem._value < 100: sem.release()
await asyncio.sleep(1) # Or spread more evenly
# with a shorter sleep and
# increasing the value less
async def do_request(self, req_id, obj):
await self.sem.acquire()
# this is the work for the request
self.req_data(req_id, obj)
def run(self, *awaitables):
loop = asyncio.get_event_loop()
if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)
def sleep(self, secs: [float]=0.02) -> True:
self.run(asyncio.sleep(secs))
return True
def get_req_id(self) -> int:
new_id = self._req_id_seq
self._req_id_seq += 1
return new_id
def start_req(self, key):
loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future
def end_req(self, key, result=None):
future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)
def req_data(self, req_id, obj):
# This is the method that "does" something
self.req_data_end(req_id)
pass
def req_data_end(self, req_id):
print(req_id, " has ended")
self.end_req(req_id)
async def req_data_batch_async(self, objs):
futures = []
FLAG = False
for obj in objs:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)
if FLAG is False:
FLAG = True
start = time.time()
self.do_request(req_id, obj)
await asyncio.gather(*futures)
elapsed = time.time() - start
print("Roughly %s per second" % (len(objs)/elapsed))
return futures
if __name__ == '__main__':
asyncio.get_event_loop().set_debug(True)
app = ThrottleTestApp()
objs = [obj for obj in range(10000)]
app.run(app.req_data_batch_async(objs))
You can do this by implementing the leaky bucket algorithm:
Note that we leak capacity from the bucket opportunistically, there is no need to run a separate async task just to lower the level; instead, capacity are leaked out when testing for sufficient remaining capacity.
Note that tasks that wait for capacity are kept in an ordered dictionary, and when there might be capacity to spare again, the first still-waiting task is woken up early.
You can use this as a context manager; trying to acquire the bucket when it is full blocks until enough capacity has been freed again:
or you can call
acquire()
directly:or you can simply test if there is space first:
Note that you can count some requests as 'heavier' or 'lighter' by increasing or decreasing the amount you 'drip' into the bucket:
Do be careful with this though; when mixing large and small drips, small drips tend to get run before large drips when at or close to the maximum rate, because there is a greater likelyhood that there is enough free capacity for a smaller drip before there is space for a larger one.
Demo:
The bucket is filled up quickly at the start in a burst, causing the rest of the tasks to be spread out more evenly; every 2 seconds enough capacity is freed for another task to be handled.
The maximum burst size is equal to the maximum rate value, in the above demo that was set to 5. If you do not want to permit bursts, set the maximum rate to 1, and the time period to the minimum time between drips:
Another solution - using bounded semaphores - by a coworker, mentor, and friend, is the following:
Can still be used with the same
async with bucket
code as in @Martijn's answer