MPI asynchronous broadcast from unknown source

2019-01-29 12:11发布

I have a C-project that has n numbers of processors working on a kind of tree search. At any given time of the program, any of these processes may find something of interest and want to send this to all other processors asynchronously.

How can I listen for new messages on the other processes without having to loop through all possible senders each loop iteration?

I have read other questions about this, for example this one (MPI - Asynchronous Broadcast/Gather), however, all I've seen so far either doesn't handle unpredictable senders or loops through each possible sender, which I don't really fancy.

EDIT for clarification:

Sending the found value to a root rank and distributing it from there is out of the option. Zulan's answer would work, if I did not have that condition, so for others this might be of help. In my case it can (and most definitely will) happen that different ranks find something they need to share multiple times (which means that race conditions could also occur).

4条回答
Animai°情兽
2楼-- · 2019-01-29 12:34

You can chose a root rank and combine an asynchronous send with an asynchronous broadcast that originates from this root. Here is a small example:

#include <stdio.h>
#include <mpi.h>

int main()
{
    int size, rank;
    MPI_Init(NULL, NULL);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    const int root = 0;
    const int found_tag = 42;
    const long long chunk = 100000ll;
    const long long target = 134861523ll;

    MPI_Request found_request;
    long long found_item;
    if (rank == root) {
        MPI_Irecv(&found_item, 1, MPI_LONG_LONG, MPI_ANY_SOURCE, found_tag, MPI_COMM_WORLD, &found_request);
    } else {
        MPI_Ibcast(&found_item, 1, MPI_LONG_LONG, root, MPI_COMM_WORLD, &found_request);
    }

    for (long long my_number = rank;; my_number += size) {
        if (my_number == target) {
            found_item = my_number;
            printf("I found it: %d, %lld\n", rank, found_item);
            // We don't stop here, we will continue until the root tells us to stop
            // This also works if we are the root
            MPI_Send(&my_number, 1, MPI_LONG_LONG, root, found_tag, MPI_COMM_WORLD);
        }
        // Avoid checking MPI_Test too often.
        if ((1 + (my_number / size)) % chunk == 0) {
            if (rank == root) {
                int found = 0;
                MPI_Test(&found_request, &found, MPI_STATUS_IGNORE);
                if (found) {
                    printf("Someone told me that he found it: %d, %lld\n", rank, found_item);
                    MPI_Request bcast_request;
                    MPI_Ibcast(&found_item, 1, MPI_LONG_LONG, root, MPI_COMM_WORLD, &bcast_request);
                    MPI_Wait(&bcast_request, MPI_STATUS_IGNORE);
                    break;
                }
            } else {
                int found = 0;
                MPI_Test(&found_request, &found, MPI_STATUS_IGNORE);
                if (found) {
                    break;
                }
            }
        }
    }
    printf("%d, %lld\n", rank, found_item);
    MPI_Finalize();
}

This code assumes that you only find exactly one number - once. Finding multiple numbers will require some additional code.

As you noted, you cannot post a broadcast with an unknown sender. Now you could post an MPI_Allreduce (or even MPI_Allgather) - after which all ranks will know the found value. However, you cannot just post an asynchronous one once - because you cannot change the value after you posted it.

查看更多
甜甜的少女心
3楼-- · 2019-01-29 12:39

First of all, as other collective primitives, the MPI broadcast operation requires all processes to participate, meaning that if you want to use a collective operation, you need all processes to enter the statement. This is because the MPI broadcast primitive is doing both the sending and the receive in the same collective statement: Corresponding Receive Routine of MPI_Bcast

This abstraction usually allows non-naive implementations of the collectives, all processes can actually contribute to the broadcast. This is well explained here : http://mpitutorial.com/tutorials/mpi-broadcast-and-collective-communication/

If you want to be notified asynchronously about each value that is found, because it could contribute in some way to your algorithm, looping on each process with an asynchronous send is probably your best bet, especially if these are small values.

In your question, you seem concerned only about using a loop for listening messages. Note that you can avoid the receive loop by using MPI_ANY_SOURCE as source parameter for probing and receiving messages. This avoids the loop but only probe/receive one message so depending on what you want to do, you may want to loop until there is no more message in queue, or have something more sophisticated to guard against large number of messages preventing progress of a process.

查看更多
啃猪蹄的小仙女
4楼-- · 2019-01-29 12:41

You may get away with sending the values at periodically appearing synchronizaton points. Instead of sending a found number immediately, you can delay it's delivery and collect the numbers locally until the next synchronization point appears. At that point, you send out a bunch of numbers at once. As you send a bunch at a time, it may justify some overhead to create the synchronization points.

The synchronization points could be created by using all. Each process sends the number of his locally collected numbers to all others. The allgather acts like a barrier after which you can do your actual broadcasting. After that barrier, all processes know how large the broadcasts will be and how many items they contain.

Allgather also just sends numbers, so why not send your number straight away? Because the allgather operations may be relatively "empty" (if you know what i mean ...) and the other processes would not know the point of thime when it occurs. If you pick fixed synchronization points, everyone knows when to synchronize and may have more than one number to transfer.

Another option may be to look into RMA (Remote Memory Operations) of MPI-3. They are also called "one-sided" because the receiver does not need to call a corresponding receive. So you do not need to know the root in order to receive broadcasted data. Maybe you can construct an intelligent put/get scheme using these operations.

I haven't found a nice solution for myself, still having a similar problem. My best idea so far: start a broadcast listener (=thread) for each possible broadcast root at each process. Of course, that creates a large overhead: you need a thread and they have to use an own MPI_Communicator per bcast source. The listeners throw the received data in a common message queue and there you go.

查看更多
▲ chillily
5楼-- · 2019-01-29 12:59

After a lot of trial and error (and other feedback) I found that the most performant solution uses asynchronous broadcasts.

My first approach was inspired by Petru's answer using multiple non blocking MPI_Isend() calls to distribute data and only one single MPI_Irecv() call (with any source) to periodically receive data. This approach turned out to be quite slow, since the non blocking MPI_ISend() calls have to be checked by the sender with MPI_Test() on every single MPI_Request handle that is created. The reason for that is, that asynchronous operations aren't truly asynchronous in the sense that they work in the background, but rather have to be checked periodically.

In my case the problem also presents the possibility that new solutions can (and will) be found multiple times, which resulted in a lot of management issues with existing, open MPI_Request handles, which had to be waited for or managed otherwise.

My second approach was using a central communicator as proposed by Zulan. This approach was very quick when there weren't many messages, but the root rank got clogged up when there were many found solutions at the same time, making it very slow in special cases. Most importantly the root rank was not as fast as the others anymore (which was to be expected), which resulted in an overall slower program in my case.

My third and final approach was using multiple non blocking MPI_Ibcast() calls, one for every possible rank that could send a message. For example that would mean that in a case with 3 different ranks that

  • rank 0 has two open MPI_Ibcasts with root 1 and 2
  • rank 1 has two open MPI_Ibcasts with root 0 and 2
  • rank 2 has two open MPI_Ibcasts with root 0 and 1

When a rank then finds a solution, it posts the last necessary MPI_Ibcast with itself as the root. At first this may seem similar to approach one, however, in this case the sender always only has to monitor a single request handle (the one from the final MPI_Ibcast) and check it periodically with MPI_Test(). The other ranks always have the same amount of open broadcasts (namely the world's size minus 1), which can be stored in an array and checked with MPI_Testany(). The difficulty with this approach is, however, that each open broadcast must operate on it's own communicator, essentially needing as many communicators as there are ranks.

So, my findings are that the second approach is acceptable sometimes, making it a quick and viable option, when there are not a lot of messages. It was the easiest to handle and implement. The third approach was faster under heavier load and also made my program termination very easy, since there always is a known amount of open connections at all time. It was the hardest and longest to implement and used up a bit more memory than the other implementations. (I don't know the reason for this, but it seems to have something to do with the broadcast's buffer management and possibly the additional communicators.) Lastly I cannot stress enough that the first approach seems easy at first, however when you really want to keep track of every single request, then it is hard to manage and slow as well, unfortunately.

查看更多
登录 后发表回答