I have an executable file which I need to run very often, with different parameters. For this I wrote a small Python (2.7) wrapper, using the multiprocessing module, following the pattern given here.
My code looks like this:
try:
logging.info("starting pool runs")
pool.map(run_nlin, params)
pool.close()
except KeyboardInterrupt:
logging.info("^C pressed")
pool.terminate()
except Exception, e:
logging.info("exception caught: ", e)
pool.terminate()
finally:
time.sleep(5)
pool.join()
logging.info("done")
My worker function is here:
class KeyboardInterruptError(Exception): pass
def run_nlin((path_config, path_log, path_nlin, update_method)):
try:
with open(path_log, "w") as log_:
cmdline = [path_nlin, path_config]
if update_method:
cmdline += [update_method, ]
sp.call(cmdline, stdout=log_, stderr=log_)
except KeyboardInterrupt:
time.sleep(5)
raise KeyboardInterruptError()
except:
raise
path_config
is the path to a configuration file for the binary program; in there is e.g. the date to run the program for.
When I start the wrapper, everything looks fine. However, when I press ^C
, the wrapper script seems to launch an additional numproc
processes from the pool before terminating. As an example, when I start the script for days 1-10, I can see in the ps aux
output that two instances of the binary program are running (usually for days 1 and 3). Now, when I press ^C
, the wrapper script exits, the binary programs for days 1 and 3 are gone, but there are new binary programs running for days 5 and 7.
So to me it seems as if the Pool
launches another numproc
processes before finally dying.
Any ideas what's happening here, and what I can do about it?
On this page, Jesse Noller, author of the multiprocessing module, shows that the correct way to handle KeyboardInterrupt
is to have the subprocesses return -- not reraise the exception. This allows the main process to terminate the pool.
However, as the code below shows, the main process does not reach the except KeyboardInterrupt
block until after all the tasks generated by pool.map
have been run. This is why (I believe) you are seeing extra calls to your worker function, run_nlin
, after Ctrl-C
has been pressed.
One possible workaround is to have all the worker functions test if a multiprocessing.Event
has been set. If the event has been set, then have the worker bail out early, otherwise, go ahead with the long calculation.
import logging
import multiprocessing as mp
import time
logger = mp.log_to_stderr(logging.WARNING)
def worker(x):
try:
if not terminating.is_set():
logger.warn("Running worker({x!r})".format(x = x))
time.sleep(3)
else:
logger.warn("got the message... we're terminating!")
except KeyboardInterrupt:
logger.warn("terminating is set")
terminating.set()
return x
def initializer(terminating_):
# This places terminating in the global namespace of the worker subprocesses.
# This allows the worker function to access `terminating` even though it is
# not passed as an argument to the function.
global terminating
terminating = terminating_
def main():
terminating = mp.Event()
result = []
pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
params = range(12)
try:
logger.warn("starting pool runs")
result = pool.map(worker, params)
pool.close()
except KeyboardInterrupt:
logger.warn("^C pressed")
pool.terminate()
finally:
pool.join()
logger.warn('done: {r}'.format(r = result))
if __name__ == '__main__':
main()
Running the script yields:
% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)
Here Ctrl-C is pressed; each of the workers sets the terminating
event. We really only need one to set it, but this works despite the small inefficiency.
C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set
Now all the other tasks queued by pool.map
are run:
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!
Finally the main process reaches the except KeyboardInterrupt
block.
[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []