Python asyncio: process a potentially infinite lis

2019-08-17 07:32发布

I have the following scenario:

  • Python 3.6+
  • The input data is read from a file, line by line.
  • A coroutine sends the data to an API (using aiohttp) and saves the result of the call to Mongo (using motor). So there's a lot of IO going on.

The code is written using async / await, and works just fine for individual calls executed manually.

What I don't know how to do is to consume the input data en masse.

All asyncio examples I've seen demonstrate asyncio.wait by sending a finite list as a parameter. But I can't simply send a list of tasks to it, because the input file may have millions of rows.

My scenario is about streaming data as through a conveyor belt to a consumer.

What else can I do? I want the program to process the data in the file using all the resources it can muster, but without getting overwhelmed.

1条回答
【Aperson】
2楼-- · 2019-08-17 08:21

My scenario is about streaming data as through a conveyor belt to a consumer. What else can I do?

You can create a fixed number of tasks roughly corresponding to the capacity of your conveyor belt, and pop them off a queue. For example:

async def consumer(queue):
    while True:
        line = await queue.get()
        # connect to API, Mongo, etc.
        ...
        queue.task_done()

async def producer():
    N_TASKS = 10
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(N_TASKS)
    tasks = [loop.create_task(consume(queue)) for _ in range(N_TASKS)]
    try:
        with open('input') as f:
            for line in f:
                await queue.put(line)
        await queue.join()
    finally:
        for t in tasks:
            t.cancel()

Since, unlike threads, tasks are lightweight and do not hog operating system resources, it is fine to err on the side of creating "too many" of them. asyncio can handle thousands of tasks without a hitch, although that is probably overkill for this tasks - tens will suffice.

查看更多
登录 后发表回答