Currently, I have an inefficient synchronous generator that makes many HTTP requests in sequence and yields the results. I'd like to use asyncio
and aiohttp
to parallelise the requests and thereby speed up this generator, but I want to keep it as an ordinary generator (not a PEP 525 async generator) so that the non-async code that calls it doesn't need to be modified. How can I create such a generator?
问题:
回答1:
asyncio.as_completed()
, currently barely documented, takes an iterable of coroutines or futures and returns an iterable of futures in the order that the input futures complete. Normally, you'd loop over its result and await
the members from inside an async
function...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
async def main():
for future in asyncio.as_completed([first(), second(), third()]):
print(await future)
loop = asyncio.get_event_loop()
# Prints 'second', then 'third', then 'first'
loop.run_until_complete(main())
... but for the purpose of this question, what we want is to be able to yield these results from an ordinary generator, so that normal synchronous code can consume them without ever knowing that async
functions are being used under the hood. We can do that by calling loop.run_until_complete()
on the futures yielded by our as_completed
call...
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
for future in asyncio.as_completed([first(), second(), third()]):
yield loop.run_until_complete(future)
# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
print(element)
In this way, we've exposed our async code to non-async-land in a manner that doesn't require callers to define any functions as async
, or to even know that ordinary_generator
is using asyncio
under the hood.
As an alternative implementation of ordinary_generator()
that offers more flexibility in some circumstances, we can repeatedly call asyncio.wait()
with the FIRST_COMPLETED
flag instead of looping over as_completed()
:
import concurrent.futures
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = [first(), second(), third()]
while pending:
done, pending = loop.run_until_complete(
asyncio.wait(
pending,
return_when=concurrent.futures.FIRST_COMPLETED
)
)
for job in done:
yield job.result()
This approach, maintaining a list of pending
jobs, has the advantage that we can adapt it to add jobs to the pending
list on the fly. This is useful in use cases where our async jobs can add an unpredictable number of further jobs to the queue - like a web spider that follows all links on each page that it visits.