Limiting the number of processes running at a time

2019-07-03 20:54发布

问题:

I'm running a backup script that launches child processes to perform backups by rsync. However I have no way to limit the number of rsyncs it launches at a time.

Here's the code I'm working on at the moment:

print "active_children: ", multiprocessing.active_children()
print "active_children len: ", len(multiprocessing.active_children())
while len(multiprocessing.active_children()) > 49:
   sleep(2)
p = multiprocessing.Process(target=do_backup, args=(shash["NAME"],ip,shash["buTYPE"], ))
jobs.append(p)
p.start()

This is showing a maximum of one child when I have hundreds of rsyncs running. Here's the code that actually launches the rsync (from inside the do_backup function), with command being a variable containing the rsync line:

print command
subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
return 1

If I add a sleep(x) to the do_backup function it will show up as an active child while it's sleeping. Also the process table is showing the rsync processes as having a PPID of 1. I'm assuming from this that the rsync splits off and is no longer a child of python which allows my child process to die so I can't count it anymore. Does anyone know how to keep the python child alive and being counted until the rsync is complete?

回答1:

Let's clear up some misconceptions first

I'm assuming from this that the rsync splits off and is no longer a child of python which allows my child process to die so I can't count it anymore.

rsync does "split off". On UNIX systems, this is called a fork.

When a process forks, a child process is created - so rsync is a child of python. This child executes independently of the parent - and concurrently ("at the same time").

A process can manage its own children. There are specific syscalls for that, but it's a bit off-topic when talking about python, which has its own high-level interfaces

If you check subprocess.Popen's documentation, you'll notice that it's not a function call at all: it's a class. By calling it, you'll create a instance of that class - a Popen object. Such objects have multiple methods. In particular, wait will allow you to block your parent process (python) until the child process terminates.


With this in mind, let's take a look at your code and simplify it a bit:

p = multiprocessing.Process(target=do_backup, ...)

Here, you're actually forking and creating a child process. This process is another python interpreter (as with all multiprocessing processes), and will execute the do_backup function.

def do_backup()
    subprocess.Popen("rsync ...", ...)

Here, you are forking again. You'll create yet another process (rsync), and let it run "in the background", because you're not waiting for it.


With all this cleared up, I hope you can see a way forward with your existing code. If you want to reduce it's complexity, I recommend you check and adapt JoErNanO's answer, that reuses multiprocessing.Pool to automate keeping track of the processes.

Whichever way you decide to pursuit, you should avoid forking with Popen to create the rsync process - as that creates yet another process unnecessarily. Instead, check os.execv, which replaces the current process with another



回答2:

Multiprocessing Pool's

Have you thought about using multiprocessing.Pool's? These allow you to define a fixed number of worker processes which are used to carry out the jobs you want. The key here is in the fixed number which will give you full control over how many instances of rsync you will be launching.

Looking at the example provided in the documentation I linked, first you declare a Pool of n processes, and then you can decide if to map() or apply() (with their respective _async() siblings) your job to the pool.

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    pool.apply_async(f, (10,))    # evaluate "f(10)" asynchronously
    ...
    pool.map(f, range(10))

The obvious advantage here is that you will never unexpectedly fork-bomb your machine as you will spawn only the requested n processes.

Running Your rsync

Your process spawning code would then become something like:

from multiprocessing import Pool

def do_backup(arg1, arg2, arg3, ...):
    # Do stuff

if __name__ == '__main__':
    # Start a Pool with 4 processes
    pool = Pool(processes=4)
    jobs = []

    for ... :
        # Run the function
        proc = pool.apply_async(func=do_backup, args=(shash["NAME"],ip,shash["buTYPE"], ))
        jobs.append(proc)

    # Wait for jobs to complete before exiting
    while(not all([p.ready() for p in jobs])):
        time.sleep(5)

    # Safely terminate the pool
    pool.close()
    pool.join()


回答3:

This is not multithreading, but multiprocessing. I'm assuming you're on a Unix system, if you're using rsync although I do believe it can run on Windows systems. In order to control the death of spawned child processes, you must fork them.

There's a good question about doing it in Python here.