Efficiently gathering/scattering tasks

2019-05-07 04:25发布

问题:

The MPI implementation I am working with does not natively support full multi-thread operations (highest level is MPI_THREAD_SERIALIZED, for complicated reasons), so I am trying to funnel the requests from multiple threads into a single worker thread and then scattering the results back out to multiple threads.

I can handle gathering local request tasks easily enough by using a concurrent queue, and MPI natively supports enqueuing asynchronous tasks. However, the problem is getting the two sides to talk to each other:

In order to scatter the responses back to the various threads, I need to call something like MPI_Waitany on the current in-flight requests, but during that time the MPI worker is effectively blocked, so it can't gather and submit any new tasks from the local workers.

// mpi worker thread
std::vector<MPI_Request> requests; // in-flight requests
while(keep_running)
{
    if(queue.has_tasks_available())
    {
        MPI_Request r;
        // task only performs asynchronous MPI calls, puts result in r
        queue.pop_and_run(task, &r);
        requests.push_back(r);
    }
    int idx;
    MPI_Waitany(requests.size(), requests.data(), &idx,
                MPI_STATUS_IGNORE); // problems here! can't get any new tasks
    dispatch_mpi_result(idx); // notifies other task that it's response is ready
    // ... erase the freed MPI_Request from requests
}

Similarly, if I just have the mpi worker wait on new tasks to be available from the concurrent queue and then poll the MPI responses using something like MPI_Testany, there's a chance that at best responses will either take really long time to actually make it to the local workers, and at worst the mpi worker will deadlock since it's waiting for local tasks, but all the tasks are waiting for mpi responses.

// mpi worker thread
std::vector<MPI_Request> requests; // in-flight requests
while(keep_running)
{
    queue.wait_for_available_task(); // problem here! might deadlock here if no new tasks happen to be submitted
    MPI_Request r;
    queue.pop_and_run(task, &r);
    requests.push_back(r);
    int idx;
    MPI_Testany(requests.size(), requests.data(), &idx, MPI_STATUS_IGNORE);
    dispatch_mpi_result(idx); // notifies other task that its response is ready
    // ... erase the freed MPI_Request from requests
}

The only solution I can see which solves both of these issues is to have the mpi worker only ever poll both sides, but this means I have a perpetually pegged thread just to handle requests:

// mpi worker thread
std::vector<MPI_Request> requests; // in-flight requests
while(keep_running)
{
    if(queue.has_tasks_available())
    {
        MPI_Request r;
        // task only performs asynchronous MPI calls, puts result in r
        queue.pop_and_run(task, &r);
        requests.push_back(r);
    }
    int idx;
    MPI_Testany(requests.size(), requests.data(), &idx, MPI_STATUS_IGNORE);
    dispatch_mpi_result(idx); // notifies other task that its response is ready
    // ... erase the freed MPI_Request from requests
}

I could introduce some sort of sleep function, but that seems like a hack and would reduce my throughput. Is there some other solution to this starvation/inefficiency problem?

回答1:

I'm afraid you are close to the best you can do with your final solution of looping around checking for new tasks from your local threads and MPI_Testany (or better MPI_Testsome).

One thing you can do, is dedicate an entire core for this. The advantages are, that this is simple, has a low latency and gives predictable performance. On a modern HPC system this will typically be > 20 cores, so < 5 % overhead. If your application is memory bound the overhead may even be negligible. Unfortunately this wastes CPU cycles and energy. A minor modification would be to introduce usleep in the loop. You will have to tune the sleep time to balance utilization and latency.

If you want to use all cores for the application, you have to be careful such that the MPI thread does not steal CPU time from the computational threads. I assume your queue implementation is blocking, i.e. does not busy wait. This leads to the situation, that the computational threads can give CPU time to the MPI thread whenever they are waiting. Unfortunately for sending this may not be true, as the worker can immediately continue after placing a task in the queue.

What you can do is to increase the nice level (reduce priority) of your MPI thread so that it primarily runs when the computational threads are waiting for results. You can also use sched_yield within the loop to give a little hint to the scheduler. While both are defined in POSIX, their semantics are very week and depend strongly on the actual scheduler implementation. It is not generally a good idea to implement a busy waiting loop with sched_yield, but you have no real alternatives. OpenMPI and MPICH implement similar loops in some cases.

The impact of the additional MPI thread, depends on how tightly coupled your computational threads are. E.g. if they are frequently in a barrier, it can severely degrade performance as just delaying a single thread will delay all threads.

At the end, if you want your implementation to be efficient, you will have to measure and tune towards a certain system.



回答2:

I have a solution that avoids busy-waiting (with or without sleeping), but it comes with its own cost: You'll need a separate MPI process to help manage the queue, and every other MPI process that wants to make requests from multiple threads must be able to talk to this process via some other IPC channel (e.g. a socket). Note that the latter restriction somewhat (but I would argue, not totally) obviates the usefulness of MPI in the first place. The basic idea is that the principal obstacle to multithreaded MPI happiness is the impossibility of having a thread block on either of 2 different flavours of IPC when one of those flavours is MPI, so we can get around that by using a separate MPI "forwarder" process to "convert" another form of IPC request into an ordinary MPI request and send it back to the originating process for us, where it can be picked up by that process's "MPI listener thread" and acted on.

Your MPI program should consist of the following processes and threads:

  1. A special "forwarder" process that has just a single thread that stays in an endless loop blocking on accept(). (I'll use sockets as an example alternative IPC mechanism here; other ones would work in a similar way.) After each accept() call completes, it reads an encoded request from the socket that contains, among other things, the process ID of the requesting process. It then immediately makes a (synchronous) MPI_Send() to that process ID, sending the encoded request to it, and starts blocking on accept() again.
  2. Any number of other processes that each have:
    • An "MPI listener" thread that stays in an endless loop blocking on MPI_Waitany(), which can receive 2 different kinds of request messages:
      1. Ordinary "incoming" requests from other processes to do something, which should be processed however you currently process them, and
      2. Requests from the "forwarder" process that represent "outgoing" requests initiated by other threads in this same process, which should be handled by posting an asynchronous MPI_Send() to the destination MPI process identified in the encoded request.
    • Any number of worker threads, which, whenever they need to make a request, make a socket connection to the forwarder process, transmit the encoded request, and then close the socket.

Clearly the forwarder process's synchronous processing of requests represents a bottleneck in the system, but it's easy to scale up simply by adding more forwarder processes that behave in the exact same way, and having worker processes choose which forwarder process to "ask" randomly.

One possible optimisation is to have the forwarder process send the "converted" request directly to the destination MPI process, instead of back to the process that originated it.