Multiprocesses become zombie processes when increa

2019-07-24 11:16发布

问题:

I'm trying to use multiprocessing to spawn 4 processes that brute force some calculations and each has a very small chance of manipulating a single list object on each iteration that I want to share between them. Not best practice in the guidelines but I need "many hands".

The code works fine for relatively small numbers of iterations but when increasing the number to a certain threshold, all four processes will go into zombie state. They fail silently.

I attempt to track the modifications to the shared list by using multiprocessing.Queue(). It appears from this SO post, this closed Python issue – "not a bug", and several posts referring to these, that the underlying pipe can become overloaded and the processes just hang. The accepted answer in the SO post is extremely difficult to decipher because of so much excess code.

Edited for clarity:
The examples in the documentation do very lightweight things, almost always single function calls. Therefore, I don't know whether I'm misunderstanding and abusing features.

The guidelines say:

It is probably best to stick to using queues or pipes for communication between processes rather than using the lower level synchronization primitives from the threading module.

Does "communicate" here mean something other than what I'm actually doing in my example?
Or
Does this mean that I should be sharing my_list in the queue rather than with a manager? Wouldn't this mean queue.get and queue.put on every iteration of every process?

If maxsize is less than or equal to zero, the queue size is infinite.

Doing this does not fix the error in my failing example. Until the point that I do queue.put() all of the data is stored within a normal Python list: my_return_list so is this actually failing due to the links I provided?

Is there a better way of doing this compared to my current workaround? I can't seem to find others taking an approach that looks similar, I feel I'm missing something. I need this to work for both Windows and Linux.

Failing example (depending on iterations under __main__):

import multiprocessing as mp
import random
import sys

def mutate_list(my_list, proc_num, iterations, queue, lock):
    my_return_list = []

    if iterations < 1001:
        # Works fine
        for x in xrange(iterations):
            if random.random() < 0.01:
                lock.acquire()

                print "Process {} changed list from:".format(proc_num)
                print my_list
                print "to"
                random.shuffle(my_list)
                print my_list
                print "........"
                sys.stdout.flush()

                lock.release()

                my_return_list.append([x, list(my_list)])
    else:
        for x in xrange(iterations):
            # Enters zombie state
            if random.random() < 0.01:
                lock.acquire()
                random.shuffle(my_list)
                my_return_list.append([x, list(my_list)])
                lock.release()
            if x % 1000 == 0:
                print "Completed iterations:", x
                sys.stdout.flush()

    queue.put(my_return_list)


def multi_proc_handler(iterations):

    manager = mp.Manager()
    ns = manager.list()
    ns.extend([x for x in range(10)])
    queue = mp.Queue()
    lock = manager.Lock()

    print "Starting list to share", ns
    print ns
    sys.stdout.flush()

    p = [mp.Process(target=mutate_list, args=(ns,x,iterations,queue,lock)) for x in range(4)]

    for process in p: process.start()
    for process in p: process.join()

    output = [queue.get() for process in p]
    return output

if __name__ == '__main__':
    # 1000 iterations is fine, 100000 iterations will create zombies
    multi_caller = multi_proc_handler(100000) 

Workaround using multiprocessing.Manager.list():

import multiprocessing as mp
import random
import sys

def mutate_list(my_list, proc_num, iterations, my_final_list, lock):

    for x in xrange(iterations):
        if random.random() < 0.01:
            lock.acquire()
            random.shuffle(my_list)
            my_final_list.append([x, list(my_list)])
            lock.release()
        if x % 10000 == 0:
            print "Completed iterations:", x
            sys.stdout.flush()


def multi_proc_handler(iterations):

    manager = mp.Manager()
    ns = manager.list([x for x in range(10)])
    lock = manager.Lock()

    my_final_list = manager.list() # My Queue substitute

    print "Starting list to share", ns
    print ns
    sys.stdout.flush()

    p = [mp.Process(target=mutate_list, args=(ns,x,iterations,my_final_list,
        lock)) for x in range(4)]

    for process in p: process.start()
    for process in p: process.join()

    return list(my_final_list)

if __name__ == '__main__':

    multi_caller = multi_proc_handler(100000) 

回答1:

Queue versus list

Underneath the hood, a multiprocessing.Queue and a manager.list() are both writing to and reading from a buffer.

Queue

shared_queue = multiprocessing.Queue()

When you call put with N or more bytes (where N is dependent on a lot of variables), it is more than the buffer can handle and the put blocks. You might be able to get the put to unblock by calling get in another process. This is an experiment that should be easy to perform using the first version of your code. I highly recommend that you try this experiment.

list

manager = multiprocessing.Manager()
shared_list = manager.list()

When you call append, you are passing much less than N bytes, and the write to the buffer succeeds. There is another process that reads the data from the buffer and appends it to the actual list. This process is created by the manager. Even if you call append with N or more bytes, everything should keep working, because there is another process reading from the buffer. You can pass an arbitrary number of bytes to another process this way.

Summary

Hopefully, this clarifies why your "workaround" works. You are breaking up the writes to the buffer into smaller pieces and you have a helper process that is reading from the buffer to put the pieces into the managed list.