I have the following scenario:
- Python 3.6+
- The input data is read from a file, line by line.
- A coroutine sends the data to an API (using
aiohttp
) and saves the result of the call to Mongo (using motor
). So there's a lot of IO going on.
The code is written using async
/ await
, and works just fine for individual calls executed manually.
What I don't know how to do is to consume the input data en masse.
All asyncio
examples I've seen demonstrate asyncio.wait
by sending a finite list as a parameter. But I can't simply send a list of tasks to it, because the input file may have millions of rows.
My scenario is about streaming data as through a conveyor belt to a consumer.
What else can I do? I want the program to process the data in the file using all the resources it can muster, but without getting overwhelmed.
My scenario is about streaming data as through a conveyor belt to a consumer. What else can I do?
You can create a fixed number of tasks roughly corresponding to the capacity of your conveyor belt, and pop them off a queue. For example:
async def consumer(queue):
while True:
line = await queue.get()
# connect to API, Mongo, etc.
...
queue.task_done()
async def producer():
N_TASKS = 10
loop = asyncio.get_event_loop()
queue = asyncio.Queue(N_TASKS)
tasks = [loop.create_task(consume(queue)) for _ in range(N_TASKS)]
try:
with open('input') as f:
for line in f:
await queue.put(line)
await queue.join()
finally:
for t in tasks:
t.cancel()
Since, unlike threads, tasks are lightweight and do not hog operating system resources, it is fine to err on the side of creating "too many" of them. asyncio can handle thousands of tasks without a hitch, although that is probably overkill for this tasks - tens will suffice.