How to iterate over a large list without blocking

2019-05-13 00:53发布

问题:

I have a python script with a running asyncio event loop, I want to know how to iterate over a large list without blocking the event loop. Thus keeping the loop running.

I've tried making a custom class with __aiter__ and __anext__ which did not work, I've also tried making an async function that yields the result but it still blocks.

Currently:

for index, item in enumerate(list_with_thousands_of_items):
    # do something

The custom class I've tried:

class Aiter:
    def __init__(self, iterable):
        self.iter_ = iter(iterable)

    async def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            object = next(self.iter_)
        except StopIteration:
            raise StopAsyncIteration
        return object

But that always results in

TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine

The async function I made which works but still blocks the event loop is:

async def async_enumerate(iterable, start:int=0):
    for idx, i in enumerate(iterable, start):
        yield idx, i

回答1:

As @deceze pointed out, you can use await asyncio.sleep(0) to explicitly pass control to the event loop. There are problems with this approach, though.

Presumably the list is quite large, which is why you needed special measures to unblock the event loop. But if the list is so large, forcing each loop iteration to yield to the event loop will slow it down considerably. Of course, you can alleviate that by adding a counter and only awaiting when i%10 == 0 or when i%100 == 0, etc. But then you have to make arbitrary decisions (guess) regarding how often to give up control. If you yield too often, you're slowing down your function. If you yield too seldom, you're making the event loop unresponsive.

This can be avoided by using run_in_executor, as suggested by RafaëlDera. run_in_executor accepts a blocking function and offloads its execution to a thread pool. It immediately returns a future that can be awaited in asyncio and whose result, once available, will be the return value of the blocking function. (If the blocking function raises, the exception will be propagated instead.) Such await will suspend the coroutine until the function returns or raises in its thread, allowing the event loop to remain fully functional in the meantime. Since the blocking function and the event loop run in separate threads, the function doesn't need to do anything to allow the event work to run - they operate independently. Even the GIL is not a problem here because GIL ensures that the control is passed between threads.

With run_in_executor your code could look like this:

def process_the_list():
    for index, item in enumerate(list_with_thousands_of_items):
        # do something

loop = asyncio.get_event_loop()
await loop.run_in_executor(None, process_the_list)


回答2:

asyncio is cooperative multitasking. The cooperative part comes from the fact that your function must yield execution back to the event loop to allow other things to run. Unless you await something (or end your function), you're hogging the event loop.

You can simply await some noop event, probably the most suitable is await asyncio.sleep(0). This ensures your task will resume as soon as possible, but allow other tasks to be scheduled as well.