I have a python multi-threaded application. I want to run an asyncio loop in a thread and post calbacks and coroutines to it from another thread. Should be easy but I cannot get my head around the asyncio stuff.
I came up to the following solution which does half of what I want, feel free to comment on anything:
import asyncio
from threading import Thread
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) #why do I need that??
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
f = functools.partial(self.loop.create_task, coro)
return self.loop.call_soon_threadsafe(f)
def cancel_task(self, xx):
#no idea
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?
b.stop()
So starting and stoping the loop works fine. I thought about creating task using create_task, but that method is not threadsafe so I wrapped it in call_soon_threadsafe. But I would like to be able to get the task object in order to be able to cancel the task. I could do a complicated stuff using Future and Condition, but there must be a simplier way, isnt'it?
You do everything right. For task stopping make method
BTW you have to setup an event loop for the created thread explicitly by
because
asyncio
creates implicit event loop only for main thread.Since version 3.4.4
asyncio
provides a function called run_coroutine_threadsafe to submit a coroutine object from a thread to an event loop. It returns a concurrent.futures.Future to access the result or cancel the task.Using your example:
I think you may need to make your
add_task
method aware of whether or not its being called from a thread other than the event loop's. That way, if it's being called from the same thread, you can just callasyncio.async
directly, otherwise, it can do some extra work to pass the task from the loop's thread to the calling thread. Here's an example:First, we save the thread id of the event loop in the
run
method, so we can figure out if calls toadd_task
are coming from other threads later. Ifadd_task
is called from a non-event loop thread, we usecall_soon_threadsafe
to call a function that will both schedule the coroutine, and then use aconcurrent.futures.Future
to pass the task back to the calling thread, which waits on the result of theFuture
.A note on cancelling a task: You when you call
cancel
on aTask
, aCancelledError
will be raised in the coroutine the next time the event loop runs. This means that the coroutine that the Task is wrapping will aborted due to the exception the next time it hit a yield point - unless the coroutine catches theCancelledError
and prevents itself from aborting. Also note that this only works if the function being wrapped is actually an interruptible coroutine; anasyncio.Future
returned byBaseEventLoop.run_in_executor
, for example, can't really be cancelled, because it's actually wrapped around aconcurrent.futures.Future
, and those can't be cancelled once their underlying function actually starts executing. In those cases, theasyncio.Future
will say its cancelled, but the function actually running in the executor will continue to run.Edit: Updated the first example to use
concurrent.futures.Future
, instead of aqueue.Queue
, per Andrew Svetlov's suggestion.Note:
asyncio.async
is deprecated since version 3.4.4 useasyncio.ensure_future
instead.just for reference here it the code I finally implemented based on the the help I got on this site, it is simpler since I did not need all features. thanks again!