Let's assume I have the following code:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
The problem with this code is that the loop inside async
coroutine is never finishing the first iteration, while queue
size is increasing.
Why is this happening this way and what can I do to fix it?
I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio
.
asyncio.Queue
is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus
, which is a third-party library that provides a thread-aware asyncio
queue:
import asyncio
import threading
import janus
def threaded(squeue):
import time
while True:
time.sleep(2)
squeue.put_nowait(time.time())
print(squeue.qsize())
@asyncio.coroutine
def async(aqueue):
while True:
time = yield from aqueue.get()
print(time)
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()
There is also aioprocessing
(full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to use multiprocessing
.
If you do not want to use another library you can schedule a coroutine from the thread. Replacing the queue.put_nowait
with the following works fine.
asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)
The variable loop
represents the event loop in the main thread.
EDIT:
The reason why your async
coroutine is not doing anything is that
the event loop never gives it a chance to do so. The queue object is
not threadsafe and if you dig through the cpython code you find that
this means that put_nowait
wakes up consumers of the queue through
the use of a future with the call_soon
method of the event loop. If
we could make it use call_soon_threadsafe
it should work. The major
difference between call_soon
and call_soon_threadsafe
, however, is
that call_soon_threadsafe
wakes up the event loop by calling loop._write_to_self()
. So let's call it ourselves:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
queue._loop._write_to_self()
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
Then, everything works as expected.
As for the threadsafe aspect of
accessing shared objects,asyncio.queue
uses under the hood
collections.deque
which has threadsafe append
and popleft
.
Maybe checking for queue not empty and popleft is not atomic, but if
you consume the queue only in one thread (the one of the event loop)
it could be fine.
The other proposed solutions, loop.call_soon_threadsafe
from Huazuo
Gao's answer and my asyncio.run_coroutine_threadsafe
are just doing
this, waking up the event loop.
BaseEventLoop.call_soon_threadsafe
is at hand. See asyncio
doc for detail.
Simply change your threaded()
like this:
def threaded():
import time
while True:
time.sleep(1)
loop.call_soon_threadsafe(queue.put_nowait, time.time())
loop.call_soon_threadsafe(lambda: print(queue.qsize()))
Here's a sample output:
0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943
What about just using threading.Lock with asyncio.Queue?
class ThreadSafeAsyncFuture(asyncio.Future):
""" asyncio.Future is not thread-safe
https://stackoverflow.com/questions/33000200/asyncio-wait-for-event-from-other-thread
"""
def set_result(self, result):
func = super().set_result
call = lambda: func(result)
self._loop.call_soon_threadsafe(call) # Warning: self._loop is undocumented
class ThreadSafeAsyncQueue(queue.Queue):
""" asyncio.Queue is not thread-safe, threading.Queue is not awaitable
works only with one putter to unlimited-size queue and with several getters
TODO: add maxsize limits
TODO: make put corouitine
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock = threading.Lock()
self.loop = asyncio.get_event_loop()
self.waiters = []
def put(self, item):
with self.lock:
if self.waiters:
self.waiters.pop(0).set_result(item)
else:
super().put(item)
async def get(self):
with self.lock:
if not self.empty():
return super().get()
else:
fut = ThreadSafeAsyncFuture()
self.waiters.append(fut)
result = await fut
return result
See also - asyncio: Wait for event from other thread