I would like to start a blocking function in an Executor using the asyncio call loop.run_in_executor and then cancel it later, but that doesn't seem to be working for me.
Here is the code:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_func(seconds_to_block):
for i in range(seconds_to_block):
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
@asyncio.coroutine
def non_blocking_func(seconds):
for i in range(seconds):
print('yielding {}/{}'.format(i, seconds))
yield from asyncio.sleep(1)
print('done non blocking {}'.format(seconds))
@asyncio.coroutine
def main():
non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
blocking_future = loop.run_in_executor(None, blocking_func, 5)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel()
yield from asyncio.wait(non_blocking_futures)
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()
I would expect the code above to only allow the blocking function to output:
blocking 0/5
blocking 1/5
and then see the output of the non blocking function. But instead the blocking future continues on even after I have canceled.
Is it possible? Is there some other way of doing it?
Thanks
Edit: More discussion on running blocking and non-blocking code using asyncio: How to interface blocking and non-blocking code with asyncio
As threads share the same memory address space of a process, there is no safe way to terminate a running thread. This is the reason why most programming languages do not allow to kill running threads (there are lots of ugly hacks around this limitation).
Java learnt it the hard way.
A solution would consist in running your function in a separate process instead of a thread and terinate it gracefully.
The Pebble library offers an interface similar to
concurrent.futures
supporting runningFutures
to be cancelled.In this case, there is no way to cancel the
Future
once it has actually started running, because you're relying on the behavior ofconcurrent.futures.Future
, and its docs state the following:So, the only time the cancellation would be successful is if the task is still pending inside of the
Executor
. Now, you're actually using anasyncio.Future
wrapped around aconcurrent.futures.Future
, and in practice theasyncio.Future
returned byloop.run_in_executor()
will raise aCancellationError
if you try toyield from
it after you callcancel()
, even if the underlying task is actually already running. But, it won't actually cancel the execution of the task inside theExecutor
.If you need to actually cancel the task, you'll need to use a more conventional method of interrupting the task running in the thread. The specifics of how you do that is use-case dependent. For the use-case you presented in the example, you could use a
threading.Event
: