Search/wait for any transmission in MS-MPI

2019-08-10 12:17发布

问题:

It's a master - slave situation. How can I make the master process search in a non-blocking way for a message transmitted to him. If at the point of searching there is no message transmited to master, it will continue with iterations. However, if there is a message transmitted to him, it will process the message and than continue with iterations. See the comment inside /* */

int main(int argc, char *argv[])
{
    int numprocs,
        rank;

    MPI_Request request;
    MPI_Status status;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    if(rank == 0) // the searching process
    {
        for (int i=0; i < 4000000; i++)
        {   
            // do some stuff here; does not matter what

            /* see if any message has been transmitted to me at this point
             without blocking the process; if at this time it happens to be
             transmitted, do something and than continue with for iternations;
             or just continue with for iterations and maybe next time will
             have a message which sends me to do something */
        }
    }
    else
    {
        int flag = 1;
        while(flag) 
        {
            // something done that at some point changes flag
        }

        // send a message to process with rank 0 and don't get stuck here
        MPI_Isend(12, 1, MPI_INT, 0, 100, MPI_COMM_WORLD, &request);

        // some other stuff done

        // wait for message to be transmitted
        MPI_Wait(&request, &status);
    }

    MPI_Finalize();
    return 0;
}

回答1:

One solution is to use MPI_IProbe() to test if a message is waiting.

  • On this line, use a pointer instead of "12"

    MPI_Isend(12, 1, MPI_INT, 0, 100, MPI_COMM_WORLD, &request);
    
  • Add flag=0 here :

    while(flag!=0) 
     {
        // something done that at some point changes flag
     }
    

Here goes the code :

#include "mpi.h"
#include "stdio.h"

int main(int argc, char *argv[])
{
int numprocs,
    rank;

MPI_Request request;
MPI_Status status;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
if(rank == 0) // the searching process
{
int i;
    for (i=0; i < 4000000; i++)
    {   
        // do some stuff here; does not matter what
    //printf("I am still running!\n");
    int flag;
    MPI_Iprobe(MPI_ANY_SOURCE,100,MPI_COMM_WORLD,&flag,&status);
    if(flag!=0){
        int value;
        MPI_Recv(&value, 1, MPI_INT, status.MPI_SOURCE, status.MPI_TAG, MPI_COMM_WORLD, &status);
        printf("I (0) received %d \n",value);
    }
        /* see if any message has been transmitted to me at this point
         without blocking the process; if at this time it happens to be
         transmitted, do something and than continue with for iternations;
         or just continue with for iterations and maybe next time will
         have a message which sends me to do something */

    }
}
else
{
int i;
for(i=0;i<42;i++){
    int flag = 1;
    while(flag!=0) 
    {
        // something done that at some point changes flag
    flag=0;
    }

int bla=1000*rank+i;
    // send a message to process with rank 0 and don't get stuck here
    MPI_Isend(&bla, 1, MPI_INT, 0, 100, MPI_COMM_WORLD, &request);

    // some other stuff done
printf("I (%d) do something\n",rank);
    // wait for message to be transmitted
    MPI_Wait(&request, &status);
}
}

MPI_Finalize();
return 0;
}

Bye,

Francis



回答2:

The non-blocking test for available messages is done using the MPI_Iprobe call. In your case it would look like:

int available;
MPI_Status status;

if(rank == 0) // the searching process
{
    for (int i=0; i < 4000000; i++)
    {   
        // do some stuff here; does not matter what

        /* see if any message has been transmitted to me at this point
         without blocking the process; if at this time it happens to be
         transmitted, do something and than continue with for iternations;
         or just continue with for iterations and maybe next time will
         have a message which sends me to do something */

        // Tag value 100 matches the value used in the send operation
        MPI_Iprobe(MPI_ANY_SOURCE, 100, MPI_COMM_WORLD, &available, &status);
        if (available)
        {
            // Message source rank is now available in status.MPI_SOURCE
            // Receive the message
            MPI_Recv(..., status.MPI_SOURCE, status.MPI_TAG, MPI_COMM_WORLD, &status);
        }
    }
}

MPI_ANY_SOURCE is used as wildcard rank, i.e. it instruct MPI_Irecv to check for messages from any source. If a corresponding send was posted, then available will be set to true, otherwise it will be set to false. The actual source of the message is also written to the MPI_SOURCE field of the status object. If the available flags indicates the availability of a matching message, one should then post a receive operation in order to receive it. It is important that the rank and the tag are explicitly specified in the receive operation, otherwise a different message could get received instead.

You could also use persistent connections. These behave very much like non-blocking operations with the important difference that they could be restarted multiple times. The same code with persistent connections would look like:

if(rank == 0) // the searching process
{
    MPI_Request req;
    MPI_Status status;
    int completed;

    // Prepare the persistent connection request
    MPI_Recv_init(buffer, buf_size, buf_type,
                  MPI_ANY_SOURCE, 100, MPI_COMM_WORLD, &req);
    // Make the request active
    MPI_Start(&req);

    for (int i=0; i < 4000000; i++)
    {   
        // do some stuff here; does not matter what

        /* see if any message has been transmitted to me at this point
         without blocking the process; if at this time it happens to be
         transmitted, do something and than continue with for iternations;
         or just continue with for iterations and maybe next time will
         have a message which sends me to do something */

        // Non-blocking Test for request completion
        MPI_Test(&req, &completed, &status);
        if (completed)
        {
            // Message is now in buffer
            // Process the message
            // ...
            // Activate the request again
            MPI_Start(&req);
        }
    }

    // Cancel and free the request
    MPI_Cancel(&req);
    MPI_Request_free(&req);
}

Persistent operations have a slight performance edge over the non-persistent ones, shown in the previous code sample. It is important that buffer is not accessed while the request is active, i.e. after the call to MPI_Start and before MPI_Test signals completion. Persistent send/receive operations also match non-persistent receive/send operations, therefore it is not necessary to change the code of the workers and they can still use MPI_Isend.



标签: c mpi