Python multiprocessing on windows with large array

2019-07-23 16:02发布

问题:

I wrote a script on a linux platform using the multiprocessing module of python. When I tried running the program on Windows this was not working directly which I found out is related to the fact how child-processes are generated on Windows. It seems to be crucial that the objects which are used can be pickled.

My main problem is, that I am using large numpy arrays. It seems that with a certain size they are not pickable any more. To break it down to a simple script, I want to do something like that:

### Import modules

import numpy as np
import multiprocessing as mp

number_of_processes = 4

if __name__ == '__main__':

    def reverse_np_array(arr):
        arr = arr + 1
        return arr

    a = np.ndarray((200,1024,1280),dtype=np.uint16)

    def put_into_queue(_Queue,arr):
        _Queue.put(reverse_np_array(arr))


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],a)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
        list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

I get the following error message:

Traceback (most recent call last):
  File "Windows_multi.py", line 34, in <module>
    Process_list[i].start()
  File "C:\Program Files\Anaconda32\lib\multiprocessing\process.py", line 130, i
n start
    self._popen = Popen(self)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 277, i
n __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 199, i
n dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())

So I am basically creating a large array which I need in all processes to do calculations with this array and return it.

One important thing seems to be to write the definitions of the functions before the statement if __name__ = '__main__':

The whole thing is working if I reduce the array to (50,1024,1280). However even if 4 processes are started and 4 cores are working, it is slower than writing the code without multiprocessing for one core only (on windows). So I think I have another problem here.

The function in my real program later on is in a cython module.

I am using the anaconda package with python 32-bit since I could not get my cython package compiled with the 64-bit version (I'll ask about that in a different thread).

Any help is welcome!!

Thanks! Philipp

UPDATE:

First mistake I did was haveing the a "put_into_queue" function definition in the __main__.

Then I introduced shared arrays as suggested, however, uses a lot of memory and the used memory scales with the processes I use (which should of course not be the case). Any ideas what I am doing wrong here? It seems not to be important where I place the definition of the shared array (in or outside __main__), though, I think it should be in the __main__. Got this from this post: Is shared readonly data copied to different processes for Python multiprocessing?

import numpy as np
import multiprocessing as mp
import ctypes


shared_array_base = mp.Array(ctypes.c_uint, 1280*1024*20)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
#print shared_array
shared_array = shared_array.reshape(20,1024,1280)

number_of_processes = 4

def put_into_queue(_Queue,arr):
    _Queue.put(reverse_np_array(arr))
def reverse_np_array(arr):
    arr = arr + 1 + np.random.rand()
    return arr
if __name__ == '__main__':


    #print shared_arra

    #a = np.ndarray((50,1024,1280),dtype=np.uint16)


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],shared_array)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
       list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

回答1:

You didn't include the full traceback; the end is most important. On my 32-bit Python I get the same traceback that finally ends in

  File "C:\Python27\lib\pickle.py", line 486, in save_string
    self.write(BINSTRING + pack("<i", n) + obj)
MemoryError

MemoryError is the exception and it says you ran out of memory.

64-bit Python would get around this, but sending large amounts of data between processes can easily become a serious bottleneck in multiprocessing.