I'm trying to run this simple code with asyncio queues, but catch exceptions, and even nested exceptions.
I would like to get some help with making queues in asyncio work correctly:
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)
num_workers = 1
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
tasks = []
async def run():
for request in range(1):
await in_queue.put(request)
# each task consumes from 'input_queue' and produces to 'output_queue':
for i in range(num_workers):
tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))
# tasks.append(asyncio.create_task(saver()))
print('waiting for queues...')
await in_queue.join()
# await out_queue.join()
print('all queues done')
for task in tasks:
task.cancel()
print('waiting until all tasks cancelled')
await asyncio.gather(*tasks, return_exceptions=True)
print('done')
async def worker(name):
while True:
try:
print(f"{name} started")
num = await in_queue.get()
print(f'{name} got {num}')
await asyncio.sleep(0)
# await out_queue.put(num)
except Exception as e:
print(f"{name} exception {e}")
finally:
print(f"{name} ended")
in_queue.task_done()
async def saver():
while True:
try:
print("saver started")
num = await out_queue.get()
print(f'saver got {num}')
await asyncio.sleep(0)
print("saver ended")
except Exception as e:
print(f"saver exception {e}")
finally:
out_queue.task_done()
asyncio.run(run(), debug=True)
print('Done!')
Output:
waiting for queues...
worker-0 started
worker-0 got 0
worker-0 ended
worker-0 started
worker-0 exception
worker-0 ended
ERROR:asyncio:unhandled exception during asyncio.run() shutdown
task: <Task finished coro=<worker() done, defined at temp4.py:34> exception=ValueError('task_done() called too many times') created at Python37\lib\asyncio\tasks.py:325>
Traceback (most recent call last):
File "Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "temp4.py", line 23, in run
await in_queue.join()
File "Python37\lib\asyncio\queues.py", line 216, in join
await self._finished.wait()
File "Python37\lib\asyncio\locks.py", line 293, in wait
await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "temp4.py", line 46, in worker
in_queue.task_done()
File "Python37\lib\asyncio\queues.py", line 202, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1664, in <module>
main()
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "temp4.py", line 63, in <module>
asyncio.run(run(), debug=True)
File "Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "temp4.py", line 23, in run
await in_queue.join()
File "Python37\lib\asyncio\queues.py", line 216, in join
await self._finished.wait()
File "Python37\lib\asyncio\locks.py", line 293, in wait
await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop
This is the basic flow, what I would like to do later is run more requests on more workers where each worker will move the number from in_queue
to out_queue
and then the saver will print the numbers from out_queue
.
Your queues must be created inside the loop. You created them outside the loop created for
asyncio.run()
, so they useevents.get_event_loop()
.asyncio.run()
creates a new loop, and futures created for the queue in one loop can't then be used in the other.Create your queues in your top-level
run()
coroutine, and either pass them to the coroutines that need them, or usecontextvars.ContextVar
objects if you must use globals.You also need to clean up how you handle task cancelling inside your tasks. A task is cancelled by raising a
asyncio.CancelledError
exception in the task. You can ignore it, but if you catch it to do clean-up work, you must re-raise it.Your task code catches all exceptions without re-raising, including
CancelledError
, so you block proper cancellations.Instead, what does happen during cancellation is that you call
queue.task_done()
; don't do that, at least not when your task is being cancelled. You should only calltask_done()
when you actually are handling a queue task, but your code callstask_done()
when an exception occurs while waiting for a queue task to appear.If you need to use
try...finally: in_queue.task_done()
, put this around the block of code that handles an item received from the queue, and keep theawait in_queue.get()
outside of thattry
block. You don't want to mark tasks done you didn't actually receive.Finally, when you print exceptions, you want to print their
repr()
; for historical reasons, thestr()
conversion of exceptions produces their.args
value, which is not very helpful forCancelledError
exceptions, which have an empty.args
. Use{e!r}
in formatted strings, so you can see what exception you are catching:So, corrected code, with the
saver()
task enabled, the queues created inside ofrun()
, and task exception handling cleaned up, would be:This prints
If you want to use globals, to share queue objects, then use
ContextVar
objects. You still create the queues inrun()
, but if you were to start multiple loops then thecontextvars
module integration will take care of keeping the queues separate: