Let's assume I have the following code:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
The problem with this code is that the loop inside async
coroutine is never finishing the first iteration, while queue
size is increasing.
Why is this happening this way and what can I do to fix it?
I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio
.
If you do not want to use another library you can schedule a coroutine from the thread. Replacing the
queue.put_nowait
with the following works fine.The variable
loop
represents the event loop in the main thread.EDIT:
The reason why your
async
coroutine is not doing anything is that the event loop never gives it a chance to do so. The queue object is not threadsafe and if you dig through the cpython code you find that this means thatput_nowait
wakes up consumers of the queue through the use of a future with thecall_soon
method of the event loop. If we could make it usecall_soon_threadsafe
it should work. The major difference betweencall_soon
andcall_soon_threadsafe
, however, is thatcall_soon_threadsafe
wakes up the event loop by callingloop._write_to_self()
. So let's call it ourselves:Then, everything works as expected.
As for the threadsafe aspect of accessing shared objects,
asyncio.queue
uses under the hoodcollections.deque
which has threadsafeappend
andpopleft
. Maybe checking for queue not empty and popleft is not atomic, but if you consume the queue only in one thread (the one of the event loop) it could be fine.The other proposed solutions,
loop.call_soon_threadsafe
from Huazuo Gao's answer and myasyncio.run_coroutine_threadsafe
are just doing this, waking up the event loop.What about just using threading.Lock with asyncio.Queue?
See also - asyncio: Wait for event from other thread
BaseEventLoop.call_soon_threadsafe
is at hand. Seeasyncio
doc for detail.Simply change your
threaded()
like this:Here's a sample output:
asyncio.Queue
is not thread-safe, so you can't use it directly from more than one thread. Instead, you can usejanus
, which is a third-party library that provides a thread-awareasyncio
queue:There is also
aioprocessing
(full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to usemultiprocessing
.