Share queue in event loop

2019-04-15 17:24发布

问题:

Is it possible to share an asyncio.Queue over different tasks in one event loop?

The usecase:

Two tasks are publishing data on a queue, and one task is grabbing the new items from the Queue. All tasks in an asynchronous way.

main.py

import asyncio
import creator


async def pull_message(queue):
    while True:
        # Here I dont get messages, maybe the queue is always
        # occupied by a other task? 
        msg = await queue.get()
        print(msg)

if __name__ == "__main__"
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(loop=loop)
    future = asyncio.ensure_future(pull_message(queue))

    creators = list()
    for i in range(2):
        creators.append(loop.create_task(cr.populate_msg(queue)))

    # add future to creators for easy handling
    creators.append(future)
    loop.run_until_complete(asyncio.gather(*creators))

creator.py

import asyncio

async def populate_msg(queue):
    while True:
        msg = "Foo"
        await queue.put(msg)

回答1:

The problem in your code is that populate_msg doesn't yield to the event loop because the queue is unbounded. This is somewhat counter-intuitive because the coroutine clearly contains an await, but that await only suspends the execution of the coroutine if the coroutine would otherwise block. Since put() on an unbounded queue never blocks, populate_msg is the only thing executed by the event loop.

The problem will go away once you change populate_msg to actually do something else (like await a network event). For testing purposes you can add await asyncio.sleep(0) inside the loop, which will force the coroutine to yield control to the event loop at every iteration of the while loop. Note that this will cause the event loop to spend an entire core by continuously spinning the loop.