asyncio as_yielded from async generators

2019-04-22 18:03发布

问题:

I'm looking to be able to yield from a number of async coroutines. Asyncio's as_completed is kind of close to what I'm looking for (i.e. I want any of the coroutines to be able to yield at any time back to the caller and then continue), but that only seems to allow regular coroutines with a single return.

Here's what I have so far:

import asyncio


async def test(id_):
    print(f'{id_} sleeping')
    await asyncio.sleep(id_)
    return id_


async def test_gen(id_):
    count = 0
    while True:
        print(f'{id_} sleeping')
        await asyncio.sleep(id_)
        yield id_
        count += 1
        if count > 5:
            return


async def main():
    runs = [test(i) for i in range(3)]

    for i in asyncio.as_completed(runs):
        i = await i
        print(f'{i} yielded')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

Replacing runs = [test(i) for i in range(3)] with runs = [test_gen(i) for i in range(3)] and for for i in asyncio.as_completed(runs) to iterate on each yield is what I'm after.

Is this possible to express in Python and are there any third party maybe that give you more options then the standard library for coroutine process flow?

Thanks

回答1:

You can use aiostream.stream.merge:

from aiostream import stream

async def main():
    runs = [test_gen(i) for i in range(3)]
    async for x in stream.merge(*runs):
        print(f'{x} yielded')

Run it in a safe context to make sure the generators are cleaned up properly after the iteration:

async def main():
    runs = [test_gen(i) for i in range(3)]
    merged = stream.merge(*runs)
    async with merged.stream() as streamer:
        async for x in streamer:
            print(f'{x} yielded')

Or make it more compact using pipes:

from aiostream import stream, pipe

async def main():
    runs = [test_gen(i) for i in range(3)]
    await (stream.merge(*runs) | pipe.print('{} yielded'))

More examples in the documentation.


Adressing @nirvana-msu comment

It is possible to identify the generator that yielded a given value by preparing sources accordingly:

async def main():
    runs = [test_gen(i) for i in range(3)]
    sources = [stream.map(xs, lambda x: (i, x)) for i, xs in enumerate(runs)]
    async for i, x in stream.merge(*sources):
        print(f'ID {i}: {x}')