I'm trying to consume multiple queues concurrently using python, asyncio and asynqp.
I don't understand why my asyncio.sleep()
function call does not have any effect. The code doesn't pause there. To be fair, I actually don't understand in which context the callback is executed, and whether I can yield control bavck to the event loop at all (so that the asyncio.sleep()
call would make sense).
What If I had to use a aiohttp.ClientSession.get()
function call in my process_msg
callback function? I'm not able to do it since it's not a coroutine. There has to be a way which is beyond my current understanding of asyncio.
#!/usr/bin/env python3
import asyncio
import asynqp
USERS = {'betty', 'bob', 'luis', 'tony'}
def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')
# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)
# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()
Because
asyncio.sleep()
returns a future object that has to be used in combination with an event loop (orasync/await
semantics).You can't use
await
in simpledef
declaration because the callback is called outside ofasync/await
context which is attached to some event loop under the hood. In other words mixing callback style withasync/await
style is quite tricky.The simple solution though is to schedule the work back to the event loop:
Note that there is no recursion in
_process_msg
function, i.e. the body ofprocess_msg
is not executed while in_process_msg
. The innerprocess_msg
function will be called once the control goes back to the event loop.This can be generalized with the following code:
See Drizzt1991's response on github for a solution.