I am trying to parallelise some operations on a large numpy array using mpi4py. I am currently using numpy.array_split
to divide the array into chunks, followed by com.scatter
to send the array to different cores and then comm.gather
to collect the resulting arrays. A minimal (not) working example is below:
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
test = np.random.rand(411,48,52,40)
test_chunks = np.array_split(test,size,axis=0)
else:
test_chunks = None
test_chunk = comm.scatter(test_chunks,root=0)
output_chunk = np.zeros([np.shape(test_chunk)[0],128,128,128])
for i in range(0,np.shape(test_chunk)[0],1):
print(i)
output_chunk[i,0:48,0:52,0:40] = test_chunk[i]
outputData = comm.gather(output_chunk,root=0)
if rank == 0:
outputData = np.concatenate(outputData,axis = 0)
Running this gives me the error:
File "test_4d.py", line 23, in <module>
outputData = comm.gather(output_chunk,root=0)
File "Comm.pyx", line 869, in mpi4py.MPI.Comm.gather (src/mpi4py.MPI.c:73266)
File "pickled.pxi", line 614, in mpi4py.MPI.PyMPI_gather (src/mpi4py.MPI.c:33592)
File "pickled.pxi", line 146, in mpi4py.MPI._p_Pickle.allocv (src/mpi4py.MPI.c:28517)
File "pickled.pxi", line 95, in mpi4py.MPI._p_Pickle.alloc (src/mpi4py.MPI.c:27832)
SystemError: Negative size passed to PyString_FromStringAndSize
This error seems to result from the large size of the numpy arrays being collected by gather; since scatter and gather send the arrays as a list of arrays, it appears easy to exceed the list size. One suggestion I have come across is to use comm.Scatter and comm.Gather. However, I am struggling to find clear documentation for these functions and so far have been unable to successfully implement them. For example:
replacing
outputData = comm.gather(output_chunk,root=0)
with the line
outputData=comm.Gather(sendbuf[test_chunks,MPI.DOUBLE],recvbuf=output_chunk,MPI.DOUBLE],root=0)
gives the error:
File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
File "message.pxi", line 51, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19644)
File "asbuffer.pxi", line 108, in mpi4py.MPI.getbuffer (src/mpi4py.MPI.c:6757)
File "asbuffer.pxi", line 50, in mpi4py.MPI.PyObject_GetBufferEx (src/mpi4py.MPI.c:6093)
TypeError: expected a readable buffer object
or with the line:
outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)
gives the error:
File "test_4d_2.py", line 24, in <module>
outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)
File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
File "message.pxi", line 60, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19747)
TypeError: unhashable type: 'numpy.ndarray'
Furthermore, the input matrix, test
may also increase in size, which could cause similar problems for comm.scatter
. Aside from the problems I already have with comm.Gather
, I am not sure how to set up comm.Scatter
, since recvbuf
is defined based on the size of test_chunk
, which is the output of comm.scatter
, so hence I can't specify recvbuf
within comm.Scatter
.
The solution is to use
comm.Scatterv
andcomm.Gatherv
which send and receive the data as a block of memory, rather than a list of numpy arrays, getting around the data size issue.comm.Scatterv
andcomm.Gatherv
assume a block of data in C-order (row-major) in memory and it is necessary to specify two vectors,sendcounts
anddisplacements
.Sendcounts
gives the integer value (index) for the positions at which to split the input data (i.e. the starting point of each vector to send to a given core), whiledisplacements
gives the length of that vector. Hence it is possible to vary the amount of data sent to each core. More details can be found here: http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.htmlAn example using
comm.Scatterv
andcomm.Gatherv
for a 2D matrix is given here: Along what axis does mpi4py Scatterv function split a numpy array?