Clean Python multiprocess termination dependant on

2019-06-22 18:38发布

I am attempting to create a program using multiple processes and I would like to cleanly terminate all the spawned processes if errors occur. below I've wrote out some pseudo type code for what I think I need to do but I don't know what the best way is to communicate to all the processes that an error has occured and they should terminate.

I think I should be using classes for this sort of thing but I'm quite new to Python so I'm just trying to get my head around the basics first.

#imports

exitFlag = True

# Function for threads to process   
def url_thread_worker( ):   
# while exitFlag:
    try:
        # do something
    except:
        # we've ran into a problem, we need to kill all the spawned processes and cleanly exit the program
        exitFlag = False

def processStarter( ):

    process_1 = multiprocessing.Process( name="Process-1", target=url_thread_worker, args=( ) ) 
    process_2 = multiprocessing.Process( name="Process-2", target=url_thread_worker, args=( ) ) 

    process_1.start()
    process_2.start()


if __name__ == '__main__':
     processStarter( )

Thanks in advance

2条回答
甜甜的少女心
2楼-- · 2019-06-22 19:33

Here's my suggestion:

import multiprocessing
import threading
import time

def good_worker():   
    print "[GoodWorker] Starting"
    time.sleep(4)
    print "[GoodWorker] all good"

def bad_worker():
    print "[BadWorker] Starting"
    time.sleep(2)
    raise Exception("ups!")

class MyProcManager(object):
    def __init__(self):
        self.procs = []
        self.errors_flag = False
        self._threads = []
        self._lock = threading.Lock()

    def terminate_all(self):
        with self._lock:
            for p in self.procs:
                if p.is_alive():
                    print "Terminating %s" % p
                    p.terminate()

    def launch_proc(self, func, args=(), kwargs= {}):
        t = threading.Thread(target=self._proc_thread_runner,
                             args=(func, args, kwargs))
        self._threads.append(t)
        t.start()

    def _proc_thread_runner(self, func, args, kwargs):
        p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
        self.procs.append(p)
        p.start()
        while p.exitcode is None:
            p.join()
        if p.exitcode > 0:
            self.errors_flag = True
            self.terminate_all()

    def wait(self):
        for t in self._threads:
            t.join()

if __name__ == '__main__':
    proc_manager = MyProcManager()
    proc_manager.launch_proc(good_worker) 
    proc_manager.launch_proc(good_worker) 
    proc_manager.launch_proc(bad_worker) 
    proc_manager.wait()
    if proc_manager.errors_flag:
        print "Errors flag is set: some process crashed"
    else:
        print "Everything closed cleanly"

You need to have a wrapper thread for each process run, that waits for its end. When a process ends, check for the exitcode: if > 0, means it raised some unhandled exception. Now call terminate_all() to close all remaining active processes. The wrapper threads will also finish as they are dependent on the process run.

Also, in your code you're completely free to call proc_manager.terminate_all() whenever you want. You can be checking for some flags in a different thread or something like that..

Hope it's good for your case.

PS: btw.. in your original code you did something like an global exit_flag: you can never have a "global" exit_flag in multiprocessing because it simply ain't global as you are using separated processes with separated memory spaces. That only works in threaded environments where state can be shared. If you need it in multiprocessing then you must have explicit communication between processes (Pipe and Queue accomplish that) or something like shared memory objects

查看更多
虎瘦雄心在
3楼-- · 2019-06-22 19:36

If you want your child processes to be terminated automatically when the parent process exits; you could make them daemonic (set .daemon=True before .start()) i.e., if the parent detects an error; it may just quit -- the children will be taken care of.

If you want children to cleanup after themselves; you could use multiprocessing.Event() as a global flag:

import multiprocessing

def event_func(event):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

if __name__ == '__main__':
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    for p in processes:
        p.join()
查看更多
登录 后发表回答