Python subprocess: callback when cmd exits

2019-01-06 10:16发布

问题:

I'm currently launching a programme using subprocess.Popen(cmd, shell=TRUE)

I'm fairly new to Python, but it 'feels' like there ought to be some api that lets me do something similar to:

subprocess.Popen(cmd, shell=TRUE,  postexec_fn=function_to_call_on_exit)

I am doing this so that function_to_call_on_exit can do something based on knowing that the cmd has exited (for example keeping count of the number of external processes currently running)

I assume that I could fairly trivially wrap subprocess in a class that combined threading with the Popen.wait() method, but as I've not done threading in Python yet and it seems like this might be common enough for an API to exist, I thought I'd try and find one first.

Thanks in advance :)

回答1:

You're right - there is no nice API for this. You're also right on your second point - it's trivially easy to design a function that does this for you using threading.

import threading
import subprocess

def popenAndCall(onExit, popenArgs):
    """
    Runs the given args in a subprocess.Popen, and then calls the function
    onExit when the subprocess completes.
    onExit is a callable object, and popenArgs is a list/tuple of args that 
    would give to subprocess.Popen.
    """
    def runInThread(onExit, popenArgs):
        proc = subprocess.Popen(*popenArgs)
        proc.wait()
        onExit()
        return
    thread = threading.Thread(target=runInThread, args=(onExit, popenArgs))
    thread.start()
    # returns immediately after the thread starts
    return thread

Even threading is pretty easy in Python, but note that if onExit() is computationally expensive, you'll want to put this in a separate process instead using multiprocessing (so that the GIL doesn't slow your program down). It's actually very simple - you can basically just replace all calls to threading.Thread with multiprocessing.Process since they follow (almost) the same API.



回答2:

There is concurrent.futures module in Python 3.2 (available via pip install futures for older Python < 3.2):

pool = Pool(max_workers=1)
f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
f.add_done_callback(callback)

The callback will be called in the same process that called f.add_done_callback().

Full program

import logging
import subprocess
# to install run `pip install futures` on Python <3.2
from concurrent.futures import ThreadPoolExecutor as Pool

info = logging.getLogger(__name__).info

def callback(future):
    if future.exception() is not None:
        info("got exception: %s" % future.exception())
    else:
        info("process returned %d" % future.result())

def main():
    logging.basicConfig(
        level=logging.INFO,
        format=("%(relativeCreated)04d %(process)05d %(threadName)-10s "
                "%(levelname)-5s %(msg)s"))

    # wait for the process completion asynchronously
    info("begin waiting")
    pool = Pool(max_workers=1)
    f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
    f.add_done_callback(callback)
    pool.shutdown(wait=False) # no .submit() calls after that point
    info("continue waiting asynchronously")

if __name__=="__main__":
    main()

Output

$ python . && python3 .
0013 05382 MainThread INFO  begin waiting
0021 05382 MainThread INFO  continue waiting asynchronously
done
2025 05382 Thread-1   INFO  process returned 0
0007 05402 MainThread INFO  begin waiting
0014 05402 MainThread INFO  continue waiting asynchronously
done
2018 05402 Thread-1   INFO  process returned 0


回答3:

I modified Daniel G's answer to simply pass the subprocess.Popen args and kwargs as themselves instead of as a separate tupple/list, since I wanted to use keyword arguments with subprocess.Popen.

In my case I had a method postExec() that I wanted to run after subprocess.Popen('exe', cwd=WORKING_DIR)

With the code below, it simply becomes popenAndCall(postExec, 'exe', cwd=WORKING_DIR)

import threading
import subprocess

def popenAndCall(onExit, *popenArgs, **popenKWArgs):
    """
    Runs a subprocess.Popen, and then calls the function onExit when the
    subprocess completes.

    Use it exactly the way you'd normally use subprocess.Popen, except include a
    callable to execute as the first argument. onExit is a callable object, and
    *popenArgs and **popenKWArgs are simply passed up to subprocess.Popen.
    """
    def runInThread(onExit, popenArgs, popenKWArgs):
        proc = subprocess.Popen(*popenArgs, **popenKWArgs)
        proc.wait()
        onExit()
        return

    thread = threading.Thread(target=runInThread,
                              args=(onExit, popenArgs, popenKWArgs))
    thread.start()

    return thread # returns immediately after the thread starts


回答4:

I had same problem, and solved it using multiprocessing.Pool. There are two hacky tricks involved:

  1. make size of pool 1
  2. pass iterable arguments within an iterable of length 1

result is one function executed with callback on completion

def sub(arg):
    print arg             #prints [1,2,3,4,5]
    return "hello"

def cb(arg):
    print arg             # prints "hello"

pool = multiprocessing.Pool(1)
rval = pool.map_async(sub,([[1,2,3,4,5]]),callback =cb)
(do stuff) 
pool.close()

In my case, I wanted invocation to be non-blocking as well. Works beautifully



回答5:

I was inspired by Daniel G. answer and implemented a very simple use case - in my work I often need to make repeated calls to the same (external) process with different arguments. I had hacked a way to determine when each specific call was done, but now I have a much cleaner way to issue callbacks.

I like this implementation because it is very simple, yet it allows me to issue asynchronous calls to multiple processors (notice I use multiprocessing instead of threading) and receive notification upon completion.

I tested the sample program and works great. Please edit at will and provide feedback.

import multiprocessing
import subprocess

class Process(object):
    """This class spawns a subprocess asynchronously and calls a
    `callback` upon completion; it is not meant to be instantiated
    directly (derived classes are called instead)"""
    def __call__(self, *args):
    # store the arguments for later retrieval
    self.args = args
    # define the target function to be called by
    # `multiprocessing.Process`
    def target():
        cmd = [self.command] + [str(arg) for arg in self.args]
        process = subprocess.Popen(cmd)
        # the `multiprocessing.Process` process will wait until
        # the call to the `subprocess.Popen` object is completed
        process.wait()
        # upon completion, call `callback`
        return self.callback()
    mp_process = multiprocessing.Process(target=target)
    # this call issues the call to `target`, but returns immediately
    mp_process.start()
    return mp_process

if __name__ == "__main__":

    def squeal(who):
    """this serves as the callback function; its argument is the
    instance of a subclass of Process making the call"""
    print "finished %s calling %s with arguments %s" % (
        who.__class__.__name__, who.command, who.args)

    class Sleeper(Process):
    """Sample implementation of an asynchronous process - define
    the command name (available in the system path) and a callback
    function (previously defined)"""
    command = "./sleeper"
    callback = squeal

    # create an instance to Sleeper - this is the Process object that
    # can be called repeatedly in an asynchronous manner
    sleeper_run = Sleeper()

    # spawn three sleeper runs with different arguments
    sleeper_run(5)
    sleeper_run(2)
    sleeper_run(1)

    # the user should see the following message immediately (even
    # though the Sleeper calls are not done yet)
    print "program continued"

Sample output:

program continued
finished Sleeper calling ./sleeper with arguments (1,)
finished Sleeper calling ./sleeper with arguments (2,)
finished Sleeper calling ./sleeper with arguments (5,)

Below is the source code of sleeper.c - my sample "time consuming" external process

#include<stdlib.h>
#include<unistd.h>

int main(int argc, char *argv[]){
  unsigned int t = atoi(argv[1]);
  sleep(t);
  return EXIT_SUCCESS;
}

compile as:

gcc -o sleeper sleeper.c


回答6:

AFAIK there is no such API, at least not in subprocess module. You need to roll something on your own, possibly using threads.