Create generator that yields coroutine results as

2019-01-23 21:07发布

问题:

Currently, I have an inefficient synchronous generator that makes many HTTP requests in sequence and yields the results. I'd like to use asyncio and aiohttp to parallelise the requests and thereby speed up this generator, but I want to keep it as an ordinary generator (not a PEP 525 async generator) so that the non-async code that calls it doesn't need to be modified. How can I create such a generator?

回答1:

asyncio.as_completed(), currently barely documented, takes an iterable of coroutines or futures and returns an iterable of futures in the order that the input futures complete. Normally, you'd loop over its result and await the members from inside an async function...

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

async def main():
    for future in asyncio.as_completed([first(), second(), third()]):
        print(await future)

loop = asyncio.get_event_loop()

# Prints 'second', then 'third', then 'first'
loop.run_until_complete(main())

... but for the purpose of this question, what we want is to be able to yield these results from an ordinary generator, so that normal synchronous code can consume them without ever knowing that async functions are being used under the hood. We can do that by calling loop.run_until_complete() on the futures yielded by our as_completed call...

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

def ordinary_generator():
    loop = asyncio.get_event_loop()
    for future in asyncio.as_completed([first(), second(), third()]):
        yield loop.run_until_complete(future)

# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
    print(element)

In this way, we've exposed our async code to non-async-land in a manner that doesn't require callers to define any functions as async, or to even know that ordinary_generator is using asyncio under the hood.

As an alternative implementation of ordinary_generator() that offers more flexibility in some circumstances, we can repeatedly call asyncio.wait() with the FIRST_COMPLETED flag instead of looping over as_completed():

import concurrent.futures

def ordinary_generator():
    loop = asyncio.get_event_loop()
    pending = [first(), second(), third()]
    while pending:
        done, pending = loop.run_until_complete(
            asyncio.wait(
                pending,
                return_when=concurrent.futures.FIRST_COMPLETED
            )
        )
        for job in done:
            yield job.result()

This approach, maintaining a list of pending jobs, has the advantage that we can adapt it to add jobs to the pending list on the fly. This is useful in use cases where our async jobs can add an unpredictable number of further jobs to the queue - like a web spider that follows all links on each page that it visits.