Similar Question (but answer does not work for me): How to cancel long-running subprocesses running using concurrent.futures.ProcessPoolExecutor?
Unlike the question linked above and the solution provided, in my case the computation itself is rather long (CPU bound) and cannot be run in a loop to check if some event has happened.
Reduced version of the code below:
import asyncio
import concurrent.futures as futures
import time
class Simulator:
def __init__(self):
self._loop = None
self._lmz_executor = None
self._tasks = []
self._max_execution_time = time.monotonic() + 60
self._long_running_tasks = []
def initialise(self):
# Initialise the main asyncio loop
self._loop = asyncio.get_event_loop()
self._loop.set_default_executor(
futures.ThreadPoolExecutor(max_workers=3))
# Run separate processes of long computation task
self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)
def run(self):
self._tasks.extend(
[self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
)
try:
# Gather bot reasoner tasks
_reasoner_tasks = asyncio.gather(*self._tasks)
# Send the reasoner tasks to main monitor task
asyncio.gather(self.sample_main_loop(_reasoner_tasks))
self._loop.run_forever()
except KeyboardInterrupt:
pass
finally:
self._loop.close()
async def sample_main_loop(self, reasoner_tasks):
"""This is the main monitor task"""
await asyncio.wait_for(reasoner_tasks, None)
for task in self._long_running_tasks:
try:
await asyncio.wait_for(task, 10)
except asyncio.TimeoutError:
print("Oops. Some long operation timed out.")
task.cancel() # Doesn't cancel and has no effect
task.set_result(None) # Doesn't seem to have an effect
self._lmz_executor.shutdown()
self._loop.stop()
print('And now I am done. Yay!')
async def bot_reasoning_loop(self, bot):
import math
_exec_count = 0
_sleepy_time = 15
_max_runs = math.floor(self._max_execution_time / _sleepy_time)
self._long_running_tasks.append(
self._loop.run_in_executor(
self._lmz_executor, really_long_process, _sleepy_time))
while time.monotonic() < self._max_execution_time:
print("Bot#{}: thinking for {}s. Run {}/{}".format(
bot, _sleepy_time, _exec_count, _max_runs))
await asyncio.sleep(_sleepy_time)
_exec_count += 1
print("Bot#{} Finished Thinking".format(bot))
def really_long_process(sleepy_time):
print("I am a really long computation.....")
_large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(_large_val))
if __name__ == "__main__":
sim = Simulator()
sim.initialise()
sim.run()
The idea is that there is a main simulation loop that runs and monitors three bot threads. Each of these bot threads then perform some reasoning but also start a really long background process using ProcessPoolExecutor
, which may end up running longer their own threshold/max execution time for reasoning on things.
As you can see in the code above, I attempted to .cancel()
these tasks when a timeout occurs. Though this is not really cancelling the actual computation, which keeps happening in the background and the asyncio
loop doesn't terminate until after all the long running computation have finished.
How do I terminate such long running CPU-bound computations within a method?
Other similar SO questions, but not necessarily related or helpful:
The approach you tried doesn't work because the futures returned by
ProcessPoolExecutor
are not cancellable. Although asyncio'srun_in_executor
tries to propagate the cancellation, it is simply ignored byFuture.cancel
once the task starts executing.There is no fundamental reason for that. Unlike threads, processes can be safely terminated, so it would be perfectly possible for
ProcessPoolExecutor.submit
to return a future whosecancel
terminated the corresponding process. Asyncio coroutines have defined cancellation semantics and would automatically make use of it. Unfortunately,ProcessPoolExecutor.submit
returns a regularconcurrent.futures.Future
, which assumes the lowest common denominator and treats a running future as untouchable.As a result, to cancel tasks executed in subprocesses, one must circumvent the
ProcessPoolExecutor
altogether and manage one's own processes. The challenge is how to do this without reimplementing half ofmultiprocessing
. One option offered by the standard library is to (ab)usemultiprocessing.Pool
for this purpose, because it supports reliable shutdown of worker processes. ACancellablePool
could work as follows:ProcessPoolExecutor
.)Here is a sample implementation of that idea:
A minimalistic test case showing cancellation:
Note how the CPU usage never exceeds 3 cores, and how it starts dropping near the end of the test, indicating that the processes are being terminated as expected.
To apply it to the code from the question, make
self._lmz_executor
an instance ofCancellablePool
and changeself._loop.run_in_executor(...)
toself._loop.create_task(self._lmz_executor.apply(...))
.