Merging async iterables in python3

2019-02-26 18:07发布

问题:

Is there a good way, or a well-supported library, for merging async iterators in python3?

The desired behavior is basically the same as that of merging observables in reactivex.

That is, in the normal case, if I'm merging two async iterator, I want the resulting async iterator to yield results chronologically. An error in one of the iterators should derail the merged iterator.

(Source: http://reactivex.io/documentation/operators/merge.html)

This is my best attempt, but it seems like something there might be a standard solution to:

async def drain(stream, q, sentinal=None):
    try:
        async for item in stream:
            await q.put(item)
        if sentinal:
            await q.put(sentinal)
    except BaseException as e:
        await q.put(e)


async def merge(*streams):

    q = asyncio.Queue()
    sentinal = namedtuple("QueueClosed", ["truthy"])(True)

    futures = {
        asyncio.ensure_future(drain(stream, q, sentinal)) for stream in streams
    }

    remaining = len(streams)
    while remaining > 0:
        result = await q.get()
        if result is sentinal:
            remaining -= 1
            continue
        if isinstance(result, BaseException):
            raise result
        yield result


if __name__ == "__main__":

    # Example: Should print:
    #   1
    #   2
    #   3
    #   4

    loop = asyncio.get_event_loop()

    async def gen():
        yield 1
        await asyncio.sleep(1.5)
        yield 3

    async def gen2():
        await asyncio.sleep(1)
        yield 2
        await asyncio.sleep(1)
        yield 4

    async def go():
        async for x in merge(gen(), gen2()):
            print(x)

    loop.run_until_complete(go())

回答1:

You can use aiostream.stream.merge:

from aiostream import stream

async def go():
    async for x in stream.merge(gen(), gen2()):
        print(x)

More examples in the documentation and this answer.