Is there a way to use asyncio.Queue in multiple th

2019-03-16 00:10发布

Let's assume I have the following code:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

The problem with this code is that the loop inside async coroutine is never finishing the first iteration, while queue size is increasing.

Why is this happening this way and what can I do to fix it?

I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio.

4条回答
手持菜刀,她持情操
2楼-- · 2019-03-16 00:18

If you do not want to use another library you can schedule a coroutine from the thread. Replacing the queue.put_nowait with the following works fine.

asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)

The variable loop represents the event loop in the main thread.

EDIT:

The reason why your async coroutine is not doing anything is that the event loop never gives it a chance to do so. The queue object is not threadsafe and if you dig through the cpython code you find that this means that put_nowait wakes up consumers of the queue through the use of a future with the call_soon method of the event loop. If we could make it use call_soon_threadsafe it should work. The major difference between call_soon and call_soon_threadsafe, however, is that call_soon_threadsafe wakes up the event loop by calling loop._write_to_self() . So let's call it ourselves:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        queue._loop._write_to_self()
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

Then, everything works as expected.

As for the threadsafe aspect of accessing shared objects,asyncio.queue uses under the hood collections.deque which has threadsafe append and popleft. Maybe checking for queue not empty and popleft is not atomic, but if you consume the queue only in one thread (the one of the event loop) it could be fine.

The other proposed solutions, loop.call_soon_threadsafe from Huazuo Gao's answer and my asyncio.run_coroutine_threadsafe are just doing this, waking up the event loop.

查看更多
Luminary・发光体
3楼-- · 2019-03-16 00:25

What about just using threading.Lock with asyncio.Queue?

class ThreadSafeAsyncFuture(asyncio.Future):
    """ asyncio.Future is not thread-safe
    https://stackoverflow.com/questions/33000200/asyncio-wait-for-event-from-other-thread
    """
    def set_result(self, result):
        func = super().set_result
        call = lambda: func(result)
        self._loop.call_soon_threadsafe(call)  # Warning: self._loop is undocumented


class ThreadSafeAsyncQueue(queue.Queue):
    """ asyncio.Queue is not thread-safe, threading.Queue is not awaitable
    works only with one putter to unlimited-size queue and with several getters
    TODO: add maxsize limits
    TODO: make put corouitine
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.lock = threading.Lock()
        self.loop = asyncio.get_event_loop()
        self.waiters = []

    def put(self, item):
        with self.lock:
            if self.waiters:
                self.waiters.pop(0).set_result(item)
            else:
                super().put(item)

    async def get(self):
        with self.lock:
            if not self.empty():
                return super().get()
            else:
                fut = ThreadSafeAsyncFuture()
                self.waiters.append(fut)
        result = await fut
        return result

See also - asyncio: Wait for event from other thread

查看更多
等我变得足够好
4楼-- · 2019-03-16 00:29

BaseEventLoop.call_soon_threadsafe is at hand. See asyncio doc for detail.

Simply change your threaded() like this:

def threaded():
    import time
    while True:
        time.sleep(1)
        loop.call_soon_threadsafe(queue.put_nowait, time.time())
        loop.call_soon_threadsafe(lambda: print(queue.qsize()))

Here's a sample output:

0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943
查看更多
我命由我不由天
5楼-- · 2019-03-16 00:36

asyncio.Queue is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus, which is a third-party library that provides a thread-aware asyncio queue:

import asyncio
import threading
import janus

def threaded(squeue):
    import time
    while True:
        time.sleep(2)
        squeue.put_nowait(time.time())
        print(squeue.qsize())

@asyncio.coroutine
def async(aqueue):
    while True:
        time = yield from aqueue.get()
        print(time)

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()

There is also aioprocessing (full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to use multiprocessing.

查看更多
登录 后发表回答