I'm trying to run this simple code with asyncio queues, but catch exceptions, and even nested exceptions.
I would like to get some help with making queues in asyncio work correctly:
import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)
num_workers = 1
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
tasks = []
async def run():
for request in range(1):
await in_queue.put(request)
# each task consumes from 'input_queue' and produces to 'output_queue':
for i in range(num_workers):
tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))
# tasks.append(asyncio.create_task(saver()))
print('waiting for queues...')
await in_queue.join()
# await out_queue.join()
print('all queues done')
for task in tasks:
task.cancel()
print('waiting until all tasks cancelled')
await asyncio.gather(*tasks, return_exceptions=True)
print('done')
async def worker(name):
while True:
try:
print(f"{name} started")
num = await in_queue.get()
print(f'{name} got {num}')
await asyncio.sleep(0)
# await out_queue.put(num)
except Exception as e:
print(f"{name} exception {e}")
finally:
print(f"{name} ended")
in_queue.task_done()
async def saver():
while True:
try:
print("saver started")
num = await out_queue.get()
print(f'saver got {num}')
await asyncio.sleep(0)
print("saver ended")
except Exception as e:
print(f"saver exception {e}")
finally:
out_queue.task_done()
asyncio.run(run(), debug=True)
print('Done!')
Output:
waiting for queues...
worker-0 started
worker-0 got 0
worker-0 ended
worker-0 started
worker-0 exception
worker-0 ended
ERROR:asyncio:unhandled exception during asyncio.run() shutdown
task: <Task finished coro=<worker() done, defined at temp4.py:34> exception=ValueError('task_done() called too many times') created at Python37\lib\asyncio\tasks.py:325>
Traceback (most recent call last):
File "Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "temp4.py", line 23, in run
await in_queue.join()
File "Python37\lib\asyncio\queues.py", line 216, in join
await self._finished.wait()
File "Python37\lib\asyncio\locks.py", line 293, in wait
await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "temp4.py", line 46, in worker
in_queue.task_done()
File "Python37\lib\asyncio\queues.py", line 202, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1664, in <module>
main()
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "temp4.py", line 63, in <module>
asyncio.run(run(), debug=True)
File "Python37\lib\asyncio\runners.py", line 43, in run
return loop.run_until_complete(main)
File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "temp4.py", line 23, in run
await in_queue.join()
File "Python37\lib\asyncio\queues.py", line 216, in join
await self._finished.wait()
File "Python37\lib\asyncio\locks.py", line 293, in wait
await fut
RuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future <Future pending> attached to a different loop
This is the basic flow, what I would like to do later is run more requests on more workers where each worker will move the number from in_queue
to out_queue
and then the saver will print the numbers from out_queue
.