I want to find the first k results over all nodes using MPI. For that I wanted to use MPI_Reduce with an own function. However my code does not work because the len parameter of the function is not the same as the count parameter given to MPI_Reduce.
I found here that implementations may do this to pipeline the computation.
My code is similar to this one:
inline void MPI_user_select_top_k(int *invec, acctbal_pair *inoutvec, int *len, MPI_Datatype *dtpr) {
std::vector<acctbal_pair> temp;
for(int i = 0; i < *len; ++i) {
acctbal_pair p1 = {invec[i].suppkey, invec[i].acctbal};
acctbal_pair p2 = {inoutvec[i].suppkey, inoutvec[i].acctbal};
temp.push_back(p1);
temp.push_back(p2);
}
std::sort(temp.begin(), temp.end(), [&](acctbal_pair a, acctbal_pair b) { return a.acctbal > b.acctbal;});
for(int i = 0; i < *len; ++i) {
inoutvec[i].suppkey = temp[i].suppkey;
inoutvec[i].acctbal = temp[i].acctbal;
}
}
Where acctbal_pair is a struct with the fields suppkey and acctbal
I call MPI_Reduce like this. Where localResults and globalResults are vectors of size k.
MPI_Reduce(localResults.data(), globalResults.data(), k, mpi_suppkey_acctbal, select_top_k, ROOT, MPI_COMM_WORLD);
However for slightly larger values of k, the count gets splitted into smaller chunks, making my function fail.
Is there any way to tell the Reduce not to pipeline the computation? Or do you know Another (efficient) way to implement this? I really don't want to use a MPI_Gather and find the first k results on the root because of large communication overhead.
I can not just create the function with a fixed parameter k (and treat all k elements it as 1 MPI_type) as k is computed at runtime.
I know that this is not the purpose of MPI_Reduce (which should just compute some operation element-wise) but this works perfectly if count is not chunked.
p.S.: My MPI Implementation is OpenMPI
Sure, you can do this - you just need to create a type of size k (which is easy enough to do at runtime) and do the selection. The only trick is, you don't have a way to pass state (eg, k) to the selection operation, so you need to communicate via a global variable - which is obviously not great, but one does what one needs to do. If you need to run the algorithm repeatedly with different sizes of k, one just creates the type as needed and resets the global variable.
(You can avoid the global variable if you sneak the value of k into the selection operation some other way - say, the first element of data passed to it in each array is the value
k
.)Below is some code that does this; it allows for the case where a processor has less than k values. Each processor selects its k minimum values and stuffs them in the local array, and then the selection operation does a partial sorted-merge operation to pick off just the k least elements.
Compiling and running gives:
(Version without global variable follows:)