multiprocessing.Pool spawning new childern after t

2019-02-03 20:11发布

问题:

I have an executable file which I need to run very often, with different parameters. For this I wrote a small Python (2.7) wrapper, using the multiprocessing module, following the pattern given here.

My code looks like this:

try:
     logging.info("starting pool runs")
     pool.map(run_nlin, params)
     pool.close()
 except KeyboardInterrupt:
     logging.info("^C pressed")
     pool.terminate()
 except Exception, e:
     logging.info("exception caught: ", e)
     pool.terminate()
 finally:
     time.sleep(5)
     pool.join()
     logging.info("done")

My worker function is here:

class KeyboardInterruptError(Exception): pass

def run_nlin((path_config, path_log, path_nlin, update_method)):
    try:
        with open(path_log, "w") as log_:
            cmdline = [path_nlin, path_config]
            if update_method:
                cmdline += [update_method, ]
            sp.call(cmdline, stdout=log_, stderr=log_)
    except KeyboardInterrupt:
        time.sleep(5)
        raise KeyboardInterruptError()
    except:
        raise

path_config is the path to a configuration file for the binary program; in there is e.g. the date to run the program for.

When I start the wrapper, everything looks fine. However, when I press ^C, the wrapper script seems to launch an additional numproc processes from the pool before terminating. As an example, when I start the script for days 1-10, I can see in the ps aux output that two instances of the binary program are running (usually for days 1 and 3). Now, when I press ^C, the wrapper script exits, the binary programs for days 1 and 3 are gone, but there are new binary programs running for days 5 and 7.

So to me it seems as if the Pool launches another numproc processes before finally dying.

Any ideas what's happening here, and what I can do about it?

回答1:

On this page, Jesse Noller, author of the multiprocessing module, shows that the correct way to handle KeyboardInterrupt is to have the subprocesses return -- not reraise the exception. This allows the main process to terminate the pool.

However, as the code below shows, the main process does not reach the except KeyboardInterrupt block until after all the tasks generated by pool.map have been run. This is why (I believe) you are seeing extra calls to your worker function, run_nlin, after Ctrl-C has been pressed.

One possible workaround is to have all the worker functions test if a multiprocessing.Event has been set. If the event has been set, then have the worker bail out early, otherwise, go ahead with the long calculation.


import logging
import multiprocessing as mp
import time

logger = mp.log_to_stderr(logging.WARNING)

def worker(x):
    try:
        if not terminating.is_set():
            logger.warn("Running worker({x!r})".format(x = x))
            time.sleep(3)
        else:
            logger.warn("got the message... we're terminating!")
    except KeyboardInterrupt:
        logger.warn("terminating is set")        
        terminating.set()
    return x

def initializer(terminating_):
    # This places terminating in the global namespace of the worker subprocesses.
    # This allows the worker function to access `terminating` even though it is
    # not passed as an argument to the function.
    global terminating
    terminating = terminating_

def main():
    terminating = mp.Event()    
    result = []
    pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
    params = range(12)
    try:
         logger.warn("starting pool runs")
         result = pool.map(worker, params)
         pool.close()
    except KeyboardInterrupt:
        logger.warn("^C pressed")
        pool.terminate()
    finally:
        pool.join()
        logger.warn('done: {r}'.format(r = result))

if __name__ == '__main__':
    main()

Running the script yields:

% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)

Here Ctrl-C is pressed; each of the workers sets the terminating event. We really only need one to set it, but this works despite the small inefficiency.

  C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set

Now all the other tasks queued by pool.map are run:

[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!

Finally the main process reaches the except KeyboardInterrupt block.

[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []