I followed up this tutorial: https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html and everything works fine when I am doing like 50 000 requests. But I need to do 1 milion API calls and then I have problem with this code:
url = "http://some_url.com/?id={}"
tasks = set()
sem = asyncio.Semaphore(MAX_SIM_CONNS)
for i in range(1, LAST_ID + 1):
task = asyncio.ensure_future(bound_fetch(sem, url.format(i)))
tasks.add(task)
responses = asyncio.gather(*tasks)
return await responses
Because Python needs to create 1 milion tasks, it basically just lags and then prints Killed
message in terminal. Is there any way to use a generator insted of pre-made set (or list) of urls? Thanks.
asyncio is memory bound (like any other program). You can not spawn more task that memory can hold. My guess is that you hit a memory limit. Check dmesg for more information.
1 millions RPS doesn't mean there is 1M tasks. A task can do several request in the same second.
Schedule all 1 million tasks at once
This is the code you are talking about. It takes up to 3 GB RAM so it is easily possible that it will be terminated by the operating system if you have low free memory.
import asyncio
from aiohttp import ClientSession
MAX_SIM_CONNS = 50
LAST_ID = 10**6
async def fetch(url, session):
async with session.get(url) as response:
return await response.read()
async def bound_fetch(sem, url, session):
async with sem:
await fetch(url, session)
async def fetch_all():
url = "http://localhost:8080/?id={}"
tasks = set()
async with ClientSession() as session:
sem = asyncio.Semaphore(MAX_SIM_CONNS)
for i in range(1, LAST_ID + 1):
task = asyncio.create_task(bound_fetch(sem, url.format(i), session))
tasks.add(task)
return await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(fetch_all())
Use queue to streamline the work
This is my suggestion how to use asyncio.Queue to pass URLs to worker tasks. The queue is filled as-needed, there is no pre-made list of URLs.
It takes only 30 MB RAM :)
import asyncio
from aiohttp import ClientSession
MAX_SIM_CONNS = 50
LAST_ID = 10**6
async def fetch(url, session):
async with session.get(url) as response:
return await response.read()
async def fetch_worker(url_queue):
async with ClientSession() as session:
while True:
url = await url_queue.get()
try:
if url is None:
# all work is done
return
response = await fetch(url, session)
# ...do something with the response
finally:
url_queue.task_done()
# calling task_done() is necessary for the url_queue.join() to work correctly
async def fetch_all():
url = "http://localhost:8080/?id={}"
url_queue = asyncio.Queue(maxsize=100)
worker_tasks = []
for i in range(MAX_SIM_CONNS):
wt = asyncio.create_task(fetch_worker(url_queue))
worker_tasks.append(wt)
for i in range(1, LAST_ID + 1):
await url_queue.put(url.format(i))
for i in range(MAX_SIM_CONNS):
# tell the workers that the work is done
await url_queue.put(None)
await url_queue.join()
await asyncio.gather(*worker_tasks)
if __name__ == '__main__':
asyncio.run(fetch_all())