How to terminate a single async task in multiproce

2019-08-27 12:16发布

问题:

How to terminate a single async task using multiprocessing in WINDOWS if that single async task exceeds a threshold time (say, 60 seconds) in Python?

import multiprocessing as mp  

def compute(test_num):  
    return test_num ** 2

def main():  

    test_num_list = [10, 20, 30, 40, 50, 60, 70, 80]

    pool = mp.Pool(processes=3)  
    results = [pool.apply_async(compute, args=(test_num,)) for test_num in test_num_list]

    output = [r.get() for r in results]  
    print output 

回答1:

Already answered here to your question.

You can use pebble process Pool for that purpose.



回答2:

As far as I know, there is no way to kill a task that has already been sent to a Pool object.

Because you control the child process, you can create your own 'watchdog' timer to make sure the process doesn't execute longer than some set amount of time. One way to do this would be to spawn a thread that waits on a conditional event with a timeout, and kills the process if the computation didn't finish in time:

from threading import Thread, Event
import multiprocessing as mp
import sys

def watchdog(e):
    finished = e.wait(timeout=60)  # returns True if Event signaled
    if not finished:
        sys.exit(-1)

def compute(test_num):
    return test_num ** 2

def time_limited_compute(test_num):  # Use this as the target of your process
    e = Event()
    Thread(target=watchdog, args=(e,)).start()

    r = compute(test_num)

    e.set()
    return r

Because the multiprocessing.Pool will respawn any Process that dies while working on a task, this should give you the result you're looking for.

Make sure you put a timeout on your get() call as well. As this will throw a multiprocessing.TimeoutError in the event it times out, you'll need to handle that:

def main():

    test_num_list = [10, 20, 30, 40, 50, 60, 70, 80]

    pool = mp.Pool(processes=3)
    results = [pool.apply_async(time_limited_compute, args=(test_num,)) 
               for test_num in test_num_list]

    def get_or_none(r):
        try: return r.get(60)
        except mp.TimeoutError: 
            return None

    output = [get_or_none(r) for r in results]
    print output

if __name__ == '__main__':
    main()