multiprocessing - sharing a complex object

2019-01-18 05:10发布

问题:

I've got a large dict-like object that needs to be shared between a number of worker processes. Each worker reads a random subset of the information in the object and does some computation with it. I'd like to avoid copying the large object as my machine quickly runs out of memory.

I was playing with the code for this SO question and I modified it a bit to use a fixed-size process pool, which is better suited to my use case. This however seems to break it.

from multiprocessing import Process, Pool
from multiprocessing.managers import BaseManager

class numeri(object):
    def __init__(self):
        self.nl = []

    def getLen(self):
        return len(self.nl)

    def stampa(self):
        print self.nl

    def appendi(self, x):
        self.nl.append(x)

    def svuota(self):
        for i in range(len(self.nl)):
            del self.nl[0]

class numManager(BaseManager):
    pass

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    return id(listaNumeri)

def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi',
                        'svuota', 'stampa'])
    mymanager = numManager()
    mymanager.start()
    listaNumeri = mymanager.numeri()
    print id(listaNumeri)

    print '------------ Process'
    for i in range(5):
        producer = Process(target=produce, args=(listaNumeri,))
        producer.start()
        producer.join()

    print '--------------- Pool'
    pool = Pool(processes=1)
    for i in range(5):
        pool.apply_async(produce, args=(listaNumeri,)).get()

if __name__ == '__main__':
    main()

The output is

4315705168
------------ Process
producing 4315705168
producing 4315705168
producing 4315705168
producing 4315705168
producing 4315705168
--------------- Pool
producing 4299771152
producing 4315861712
producing 4299771152
producing 4315861712
producing 4299771152

As you can see, in the first case all worker processes get the same object (by id). In the second case, the id is not the same. Does that mean the object is being copied?

P.S. I don't think that matters, but I am using joblib, which internally used a Pool:

from joblib import delayed, Parallel

print '------------- Joblib'
        Parallel(n_jobs=4)(delayed(produce)(listaNumeri) for i in range(5))

which outputs:

------------- Joblib
producing 4315862096
producing 4315862288
producing 4315862480
producing 4315862672
producing 4315862352

回答1:

I'm afraid virtually nothing here works the way you hope it works :-(

First note that identical id() values produced by different processes tell you nothing about whether the objects are really the same object. Each process has its own virtual address space, assigned by the operating system. The same virtual address in two processes can refer to entirely different physical memory locations. Whether your code produces the same id() output or not is pretty much purely accidental. Across multiple runs, sometimes I see different id() output in your Process section and repeated id() output in your Pool section, or vice versa, or both.

Second, a Manager supplies semantic sharing but not physical sharing. The data for your numeri instance lives only in the manager process. All your worker processes see (copies of) proxy objects. Those are thin wrappers that forward all operations to be performed by the manager process. This involves lots of inter-process communication, and serialization inside the manager process. This is a great way to write really slow code ;-) Yes, there is only one copy of the numeri data, but all work on it is done by a single process (the manager process).

To see this more clearly, make the changes @martineau suggested, and also change get_list_id() to this:

def get_list_id(self):  # added method
    import os
    print("get_list_id() running in process", os.getpid())
    return id(self.nl)

Here's sample output:

41543664
------------ Process
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 46268496
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 44153904
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
--------------- Pool
producing 41639248
get_list_id() running in process 5856
with list_id 44544608
producing 41777200
get_list_id() running in process 5856
with list_id 44544608
producing 41776816
get_list_id() running in process 5856
with list_id 44544608
producing 41777168
get_list_id() running in process 5856
with list_id 44544608
producing 41777136
get_list_id() running in process 5856
with list_id 44544608

Clear? The reason you get the same list id each time is not because each worker process has the same self.nl member, it's because all numeri methods run in a single process (the manager process). That's why the list id is always the same.

If you're running on a Linux-y system (an OS that supports fork()), a much better idea is to forget all this Manager stuff and create your complex object at module level before starting any worker processes. Then the workers will inherit (address-space copies of) your complex object. The usual copy-on-write fork() semantics will make that about as memory-efficient as possible. That's sufficient if mutations don't need to be folded back into the main program's copy of the complex object. If mutations do need to be folded back in, then you're back to needing lots of inter-process communication, and multiprocessing becomes correspondingly less attractive.

There are no easy answers here. Don't shoot the messenger ;-)



回答2:

If you add two lines to your code, you'll find something very bizarre about this behavior:

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    print listaNumeri # <- New line
    return id(listaNumeri)


def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi', 'svuota', 'stampa', 'getAll'])
    mymanager = numManager()
    mymanager.start()
    listaNumeri = mymanager.numeri()
    print listaNumeri # <- New line
    print id(listaNumeri)

This gives you the following output:

<__main__.numeri object at 0x103892990>
4354247888
------------ Process
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
--------------- Pool
producing 4352988560
<__main__.numeri object at 0x103892990>
producing 4354547664
<__main__.numeri object at 0x103892990>
producing 4352988560
<__main__.numeri object at 0x103892990>
producing 4354547664
<__main__.numeri object at 0x103892990>
producing 4352988560
<__main__.numeri object at 0x103892990>

As you can see, the object is the same each time but the id is not always the same. Additionally, look at the ids used during the pool section - it switches back and forth between two ids.

The answer for what is going is comes from actually printing out the __class__ attribute during produce. Each run, the __class__ is actually

<class 'multiprocessing.managers.AutoProxy[numeri]'>

So the numeri object is wrapped in an AutoProxy each time, and the AutoProxy is not always the same. However, the numeri object being wrapped is the same in each call to produce. If you call the appendi method once in produce, then listaNumeri will end up with 10 items at the end of your program.



回答3:

You're confusing the object instance numeri, with its Manager listaNumeri. This can be illustrated by making a few minor modifications to the code:

First add aget_list_idmethod to class numeri(object) which returns the id of the actual internal data structure being used:

    ...                                                   
    def get_list_id(self):  # added method
        return id(self.nl)

Then modifyproduce()to use it:

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    print ' with list_id', listaNumeri.get_list_id()  # added
    return id(listaNumeri)

Lastly, be sure to expose the new method as a part of the numManager interface:

def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi',
                                                   'svuota', 'stampa',
                                                   'get_list_id'])  # added
    ...                                                   

Afterwards you'll see something like the following output:

13195568
------------ Process
producing 12739600
 with list_id 13607080
producing 12739600
 with list_id 13607080
producing 12739600
 with list_id 13607080
producing 12739600
 with list_id 13607080
producing 12739600
 with list_id 13607080
--------------- Pool
producing 13690384
 with list_id 13607080
producing 13691920
 with list_id 13607080
producing 13691888
 with list_id 13607080
producing 13691856
 with list_id 13607080
producing 13691824
 with list_id 13607080

As this shows, even though there's a different Manager object for each Pool process, they're all using (sharing) the same "managed" data object.