MPI_Type_create_subarray and MPI_Gather

2019-01-08 13:39发布

问题:

I have to solve a little mpi problem. I have 4 slaves processes and each of these wants to send a 2d subarray (CHUNK_ROWS X CHUNK_COLUMNS) to master 0. Master 0 collects all chunks in ddd[ROWS][COLUMNS] and print it. I want to use MPI_Gather()

#include <mpi.h>
#include <iostream>
using namespace std;

#define ROWS 10
#define COLUMNS 10
#define CHUNK_ROWS 5
#define CHUNK_COLUMNS 5
#define TAG 0

int** alloca_matrice(int righe, int colonne)
{
int** matrice=NULL;
int i;

matrice = (int **)malloc(righe * sizeof(int*));

if(matrice != NULL){
  matrice[0] = (int *)malloc(righe*colonne*sizeof(int));
  if(matrice[0]!=NULL)
    for(i=1; i<righe; i++)
        matrice[i] = matrice[0]+i*colonne;
  else{
    free(matrice);
    matrice = NULL;
  }
}
else{
  matrice = NULL;
}
return matrice;

}

int main(int argc, char* argv[])
{

int my_id, numprocs,length,i,j;
int ndims, sizes[2],subsizes[2],starts[2];
int** DEBUG_CH=NULL;
int** ddd=NULL;
char name[BUFSIZ];
MPI_Datatype subarray=NULL;
//MPI_Status status;
MPI_Init(&argc, &argv) ;    
MPI_Comm_rank(MPI_COMM_WORLD, &my_id) ;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs) ;  // Ottiene quanti processi sono attivi
MPI_Get_processor_name(name, &length);    

if(my_id!=0){
  //creo una sottomatrice ripulita dalle ghost cells
  ndims=2;
  sizes[0] = CHUNK_ROWS+2;
  sizes[1] = CHUNK_COLUMNS+2;
  subsizes[0] = CHUNK_ROWS;
  subsizes[1] = CHUNK_COLUMNS;
  starts[0] = 1;
  starts[1] = 1;
  MPI_Type_create_subarray(ndims,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&subarray);
  MPI_Type_commit(&subarray);

  DEBUG_CH = alloca_matrice(CHUNK_ROWS+2,CHUNK_COLUMNS+2);
  for(i=0; i<CHUNK_ROWS+2; i++){
    for(j=0; j<CHUNK_COLUMNS+2; j++){
        if(i==0 || i==CHUNK_ROWS+1 || j==0 || j==CHUNK_COLUMNS+1)
            DEBUG_CH[i][j] = 5;
        else
            DEBUG_CH[i][j] = 1;
    }
  }
//MPI_Send(DEBUG_CH[0],1,subarray,0,TAG,MPI_COMM_WORLD);
}
if(my_id==0){
 ddd = alloca_matrice(ROWS,COLUMNS);
}

MPI_Gather(DEBUG_CH[0],1,subarray,ddd[0],CHUNK_ROWS*CHUNK_COLUMNS,MPI_INT,0,MPI_COMM_WORLD);
if(!my_id){
  for(i=0; i<ROWS; i++){
    for(j=0; j<COLUMNS; j++){
        printf("%d ",ddd[i][j]);
    }
    printf("\n");
  }
}

if(my_id)
 MPI_Type_free(&subarray);

MPI_Finalize();                             // Chiusura di MPI.
return 0;
}

Thanks all.

回答1:

So this is a little more subtle, and requires some understanding of how the Gather collective places complex types.

If you look at most examples of MPI_Gather, they're of 1-d arrays, and it's fairly easy to interpret what should happen; you're getting (say) 10 ints from each process, and Gather is smart enough to put the 10 ints from rank 0 at the start, the 10 from rank 1 at positions 10-19 in the array, and so on.

More complex layouts like this are a little more complicated, though. First, the data layout from the sender's point of view is different from the data layout from the receivers. In the sender's point of view, you start at array element [1][2], go to [1][5] (in an array of size 7x7), then jump to array elements [2][3]-[2][5], etc. There are CHUNK_ROWS blocks of data, each separated by 2 ints.

Now consider how the reciever has to receive them. Let's say it's receiving rank 0's data. It's going to receive that into array elements [0][0]-[0][4] -- so far so good; but then it's going to receive the next block of data into [1][0]-[1][4], in an array of size 10x10. That's a jump over 5 elements. The layout in memory is different. So the receiver will have to be recieving into a different Subarray type then the senders are sending from, because the memory layout is different.

So while you might be sending from something that looks like this:

  sizes[0] = CHUNK_ROWS+2;
  sizes[1] = CHUNK_COLUMNS+2;
  subsizes[0] = CHUNK_ROWS;
  subsizes[1] = CHUNK_COLUMNS;
  starts[0] = 1;
  starts[1] = 1;
  MPI_Type_create_subarray(ndims,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&sendsubarray);
  MPI_Type_commit(&sendsubarray);

you'll be receiving into something that looks like this:

  sizes[0]    = ROWS; sizes[1] = COLUMNS;
  subsizes[0] = CHUNK_ROWS; subsizes[1] = CHUNK_COLUMNS;
  starts[0]   = 0; starts[1] = 0;
  MPI_Type_create_subarray(ndims,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&recvsubarray);
  MPI_Type_commit(&recvsubarray);

Crucially, notice the difference in the sizes array.

Now we're getting a little closer. Notice your MPI_Gather line changes to something like this:

MPI_Gather(DEBUG_CH[0],1,sendsubarray,recvptr,1,recvsubarray,0,MPI_COMM_WORLD);

There were a couple things that didn't work about the previous version, MPI_Gather(DEBUG_CH[0],1,subarray,ddd[0],CHUNK_ROWS*CHUNK_COLUMNS,MPI_INT,0,MPI_COMM_WORLD); -- first, note that you're referencing ddd[0], but for every rank except rank 0, ddd=NULL, and so this will fail. So create a new variable called say recvptr, and in rank zero, set that to ddd[0]. (It doesn't matter where the other processes think it is, as they're not receiving.) Also, I think you don't want to be receiving CHUNK_ROWS*CHUNK_COLUMS MPI_INTs, because that would place them contiguously in memory, and my understanding is you want them layed out the same way as on the slave tasks, but in the larger array.

Ok, so now we're getting somewhere, but the above still won't work, for an interesting reason. For the 1d array examples, it's easy enough to figure out where the nth ranks data goes. The way it's calculated is by finding the extent of the data being recieved, and starting the next element just after that one. But that won't work here. "Just after" the end of rank zero's data is not where rank one's data should start ([0][5]) but instead, [4][5] -- the element after the last element in rank 0s subarray. Here, the data you're reciving from different ranks overlaps! So we're going to have to fiddle with the extents of the data types, and manually specify where each rank's data starts. The second is the easy part; you use the MPI_Gatherv function when you need to manually specify the amount of data from each processor, or where it goes. The first is the trickier part.

MPI let's you specify the lower and upper bounds of a given data type -- where, given a piece of memory, the first bit of data for this type would go, and where it "ends", which here only means where the next one could start. (The data can extend past the upper bound of the type, which I'd argue makes those names misleading, but such is the way of things.) You can specify this to be anything you like that makes it convenient for you; since we'll be dealing with elements in an int array, let's make the extent of our type one MPI_INT in size.

  MPI_Type_create_resized(recvsubarray, 0, 1*sizeof(int), &resizedrevsubarray);
  MPI_Type_commit(&resizedrecvsubarray);

(Note we only have to do this for the received type; from the send type, since we're only sending one of them, it doesn't matter).

Now, we'll use gatherv to specify where each element starts -- in units of the "size" of this new resized type, which is just 1 integer. So if we want something to go into the large array at [0][5], the displacement from the start of the large array is 5; if we want it to go in there at position [5][5], the displacement is 55.

Finally, notice that the gather and scatter collectives all assume that even the "master" is participating. It's easiest to get this working if even the master has their own piece of the global array.

So with that, the following works for me:

#include <mpi.h>
#include <iostream>
#include <cstdlib>
using namespace std;

#define ROWS 10
#define COLUMNS 10
#define CHUNK_ROWS 5
#define CHUNK_COLUMNS 5
#define TAG 0

int** alloca_matrice(int righe, int colonne)
{
    int** matrice=NULL;
    int i;

    matrice = (int **)malloc(righe * sizeof(int*));

    if(matrice != NULL){
        matrice[0] = (int *)malloc(righe*colonne*sizeof(int));
        if(matrice[0]!=NULL)
            for(i=1; i<righe; i++)
                matrice[i] = matrice[0]+i*colonne;
        else{
            free(matrice);
            matrice = NULL;
        }
    }
    else{
        matrice = NULL;
    }
    return matrice;

}

int main(int argc, char* argv[])
{

    int my_id, numprocs,length,i,j;
    int ndims, sizes[2],subsizes[2],starts[2];
    int** DEBUG_CH=NULL;
    int** ddd=NULL;
    int *recvptr=NULL;
    char name[BUFSIZ];
    MPI_Datatype sendsubarray;
    MPI_Datatype recvsubarray;
    MPI_Datatype resizedrecvsubarray;
    //MPI_Status status;
    MPI_Init(&argc, &argv) ;    
    MPI_Comm_rank(MPI_COMM_WORLD, &my_id) ;
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs) ;  // Ottiene quanti processi sono attivi
    if (numprocs != 4) {
        MPI_Abort(MPI_COMM_WORLD,1);
    }
    MPI_Get_processor_name(name, &length);    

    //creo una sottomatrice ripulita dalle ghost cells
    ndims=2;
    sizes[0] = CHUNK_ROWS+2;
    sizes[1] = CHUNK_COLUMNS+2;
    subsizes[0] = CHUNK_ROWS;
    subsizes[1] = CHUNK_COLUMNS;
    starts[0] = 1;
    starts[1] = 1;
    MPI_Type_create_subarray(ndims,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&sendsubarray);
    MPI_Type_commit(&sendsubarray);

    DEBUG_CH = alloca_matrice(CHUNK_ROWS+2,CHUNK_COLUMNS+2);
    for(i=0; i<CHUNK_ROWS+2; i++){
        for(j=0; j<CHUNK_COLUMNS+2; j++){
            if(i==0 || i==CHUNK_ROWS+1 || j==0 || j==CHUNK_COLUMNS+1)
                DEBUG_CH[i][j] = 5;
            else
                DEBUG_CH[i][j] = my_id;
        }
    }

    recvptr=DEBUG_CH[0];
    if(my_id==0){
        ddd = alloca_matrice(ROWS,COLUMNS);
        sizes[0]    = ROWS; sizes[1] = COLUMNS;
        subsizes[0] = CHUNK_ROWS; subsizes[1] = CHUNK_COLUMNS;
        starts[0]   = 0; starts[1] = 0;
        MPI_Type_create_subarray(2,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&recvsubarray);
        MPI_Type_commit(&recvsubarray);
        MPI_Type_create_resized(recvsubarray, 0, 1*sizeof(int), &resizedrecvsubarray);
        MPI_Type_commit(&resizedrecvsubarray);
        recvptr = ddd[0];
    }

    int counts[5]={1,1,1,1};
    int disps[5] ={0,5,50,55};
    MPI_Gatherv(DEBUG_CH[0],1,sendsubarray,recvptr,counts,disps,resizedrecvsubarray,0,MPI_COMM_WORLD);
    if(!my_id){
        for(i=0; i<ROWS; i++){
            for(j=0; j<COLUMNS; j++){
                printf("%d ",ddd[i][j]);
            }
            printf("\n");
        }
    }

    if(my_id == 0) {
        MPI_Type_free(&resizedrecvsubarray);
        MPI_Type_free(&recvsubarray);
        free(ddd[0]);
        free(ddd);
    } else {
        MPI_Type_free(&sendsubarray);
        free(DEBUG_CH[0]);
        free(DEBUG_CH);
    }

    MPI_Finalize();                             // Chiusura di MPI.
    return 0;
}