asyncio: Wait for event from other thread

2020-07-07 10:59发布

I'm designing an application in Python which should access a machine to perform some (lengthy) tasks. The asyncio module seems to be a good choice for everything that is network-related, but now I need to access the serial port for one specific component. I've implemented kind of an abstraction layer for the actual serial port stuff, but can't figure out how to sensibly integrate this with asyncio.

Following setup: I have a thread running a loop, which regularly talks to the machine and decodes the responses. Using a method enqueue_query(), I can put a query string into a queue, which will then be sent to the machine by the other thread and cause a response. By passing in a threading.Event (or anything with a set() method), the caller can perform a blocking wait for the response. This can then look something like this:

f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
f.wait()
print(ch.get_query_responses())

My goal is now to put those lines into a coroutine and have asyncio handle this waiting, so that the application can do something else in the meantime. How could I do this? It would probably work by wrapping the f.wait() into an Executor, but this seems to be a bit stupid, as this would create a new thread only to wait for another thread to do something.

Thanks! Best regards, Philipp

2条回答
老娘就宠你
2楼-- · 2020-07-07 11:20

The simplest way is to do exactly what you suggested - wrap the call to f.wait() in an executor:

@asyncio.coroutine
def do_enqueue():
    f = threading.Event()
    ch.enqueue_query('2 getnlimit', f)
    yield from loop.run_in_executor(None, f.wait)
    print(ch.get_query_responses())

You do incur the overhead of starting up a thread pool (at least for the first call, the pool will stay in memory from that point forward), but any solution that provides an implementation of something like threading.Event() with thread-safe blocking and non-blocking APIs, without relying on any background threads internally, would be quite a bit more work.

查看更多
男人必须洒脱
3楼-- · 2020-07-07 11:22

By passing in a threading.Event (or anything with a set() method), the caller can perform a blocking wait for the response.

Given the above behavior of your query function, all you need is a thread-safe version of asyncio.Event. It's just 3 lines of code:

import asyncio
class Event_ts(asyncio.Event):
    #TODO: clear() method
    def set(self):
        #FIXME: The _loop attribute is not documented as public api!
        self._loop.call_soon_threadsafe(super().set)

A test for functionality:

def threaded(event):
    import time
    while True:
        event.set()
        time.sleep(1)

async def main():
    import threading
    e = Event_ts()
    threading.Thread(target=threaded, args=(e,)).start()
    while True:
        await e.wait()
        e.clear()
        print('whatever')

asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()
查看更多
登录 后发表回答