I have a python script with a running asyncio event loop, I want to know how to iterate over a large list without blocking the event loop. Thus keeping the loop running.
I've tried making a custom class with __aiter__
and __anext__
which did not work, I've also tried making an async function
that yields the result but it still blocks.
Currently:
for index, item in enumerate(list_with_thousands_of_items):
# do something
The custom class I've tried:
class Aiter:
def __init__(self, iterable):
self.iter_ = iter(iterable)
async def __aiter__(self):
return self
async def __anext__(self):
try:
object = next(self.iter_)
except StopIteration:
raise StopAsyncIteration
return object
But that always results in
TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine
The async function
I made which works but still blocks the event loop is:
async def async_enumerate(iterable, start:int=0):
for idx, i in enumerate(iterable, start):
yield idx, i
As @deceze pointed out, you can use await asyncio.sleep(0)
to explicitly pass control to the event loop. There are problems with this approach, though.
Presumably the list is quite large, which is why you needed special measures to unblock the event loop. But if the list is so large, forcing each loop iteration to yield to the event loop will slow it down considerably. Of course, you can alleviate that by adding a counter and only awaiting when i%10 == 0
or when i%100 == 0
, etc. But then you have to make arbitrary decisions (guess) regarding how often to give up control. If you yield too often, you're slowing down your function. If you yield too seldom, you're making the event loop unresponsive.
This can be avoided by using run_in_executor
, as suggested by RafaëlDera. run_in_executor
accepts a blocking function and offloads its execution to a thread pool. It immediately returns a future that can be await
ed in asyncio and whose result, once available, will be the return value of the blocking function. (If the blocking function raises, the exception will be propagated instead.) Such await
will suspend the coroutine until the function returns or raises in its thread, allowing the event loop to remain fully functional in the meantime. Since the blocking function and the event loop run in separate threads, the function doesn't need to do anything to allow the event work to run - they operate independently. Even the GIL is not a problem here because GIL ensures that the control is passed between threads.
With run_in_executor
your code could look like this:
def process_the_list():
for index, item in enumerate(list_with_thousands_of_items):
# do something
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, process_the_list)
asyncio
is cooperative multitasking. The cooperative part comes from the fact that your function must yield execution back to the event loop to allow other things to run. Unless you await
something (or end your function), you're hogging the event loop.
You can simply await
some noop event, probably the most suitable is await asyncio.sleep(0)
. This ensures your task will resume as soon as possible, but allow other tasks to be scheduled as well.