MPI_Reduce select first k results

2019-08-08 06:07发布

问题:

I want to find the first k results over all nodes using MPI. For that I wanted to use MPI_Reduce with an own function. However my code does not work because the len parameter of the function is not the same as the count parameter given to MPI_Reduce.
I found here that implementations may do this to pipeline the computation.

My code is similar to this one:

inline void MPI_user_select_top_k(int *invec, acctbal_pair *inoutvec, int *len, MPI_Datatype *dtpr) {
    std::vector<acctbal_pair> temp;
    for(int i = 0; i < *len; ++i) {
        acctbal_pair p1 = {invec[i].suppkey, invec[i].acctbal};
        acctbal_pair p2 = {inoutvec[i].suppkey, inoutvec[i].acctbal};
        temp.push_back(p1);
        temp.push_back(p2);
    }
    std::sort(temp.begin(), temp.end(), [&](acctbal_pair a, acctbal_pair b) { return a.acctbal > b.acctbal;});
    for(int i = 0; i < *len; ++i) {
        inoutvec[i].suppkey = temp[i].suppkey;
        inoutvec[i].acctbal = temp[i].acctbal;
    }
}

Where acctbal_pair is a struct with the fields suppkey and acctbal
I call MPI_Reduce like this. Where localResults and globalResults are vectors of size k.

MPI_Reduce(localResults.data(), globalResults.data(), k, mpi_suppkey_acctbal, select_top_k, ROOT, MPI_COMM_WORLD);

However for slightly larger values of k, the count gets splitted into smaller chunks, making my function fail.

Is there any way to tell the Reduce not to pipeline the computation? Or do you know Another (efficient) way to implement this? I really don't want to use a MPI_Gather and find the first k results on the root because of large communication overhead.

I can not just create the function with a fixed parameter k (and treat all k elements it as 1 MPI_type) as k is computed at runtime.

I know that this is not the purpose of MPI_Reduce (which should just compute some operation element-wise) but this works perfectly if count is not chunked.

p.S.: My MPI Implementation is OpenMPI

回答1:

Sure, you can do this - you just need to create a type of size k (which is easy enough to do at runtime) and do the selection. The only trick is, you don't have a way to pass state (eg, k) to the selection operation, so you need to communicate via a global variable - which is obviously not great, but one does what one needs to do. If you need to run the algorithm repeatedly with different sizes of k, one just creates the type as needed and resets the global variable.

(You can avoid the global variable if you sneak the value of k into the selection operation some other way - say, the first element of data passed to it in each array is the value k.)

Below is some code that does this; it allows for the case where a processor has less than k values. Each processor selects its k minimum values and stuffs them in the local array, and then the selection operation does a partial sorted-merge operation to pick off just the k least elements.

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <limits.h>
#include <math.h>

const int invalid_data = -1;
static int globalselectk;        /* yuk */

int min2(int a, int b) {
    if (b < a)
        return b;
    return a;
}

MPI_Datatype createtype(int selectk) {

    MPI_Datatype selectktype;
    MPI_Type_contiguous(selectk, MPI_INT, &selectktype);
    MPI_Type_commit(&selectktype);

    return selectktype;
}

void initselectk(int *d, size_t selectk) {
    for (int i=0; i<selectk; i++)
        d[i] = invalid_data;
}

void printselectk(int *d, size_t selectk) {
    printf("[");
    for (int i=0; i<selectk; i++)
        printf("%3d ",d[i]);
    printf("] ");
}

int countselectk(int *d, size_t selectk) {
    int count = 0;

    while ( (d[count] != invalid_data) && (count < selectk) )
        count++;

    return count;
}
int mergeselect(int *d1, int *d2, int *dout, size_t selectk) {
    int count1 = countselectk(d1, selectk);
    int count2 = countselectk(d2, selectk);

    int count = 0;
    int total = count1+count2;
    if (total >= selectk) total = selectk;

    int idx1=0, idx2=0;

    while (count < total) {
        int minloc = -1;
        int minval = INT_MAX;
        if (idx1 < count1) {
            minloc = 1;
            minval = d1[idx1];
        }
        if ( (idx2 < count2) && (d2[idx2] < minval ) ) {
            minloc = 2;
            minval = d2[idx2];
        }
        dout[count++] = minval;
        if (minloc == 1)
            idx1++;
        else
            idx2++;
    }
    return count;
}

void selectop(void *in, void *inout, int *len, MPI_Datatype *type) {
    int *invals = (int *)in;
    int *inoutvals = (int *)inout;

    int out[globalselectk];

    for (int i=0; i<*len; i++) {
        initselectk(out, globalselectk);
        int count = mergeselect(invals, inoutvals, out, globalselectk);

        for (int j=0; j<count; j++)
            inoutvals[j] = out[j];

        invals += globalselectk;
        inoutvals += globalselectk;
    }

    return;
}

int intcompar(const void *v1, const void *v2) {
    int *i1 = (int *)v1;
    int *i2 = (int *)v2;

    return (*i1 - *i2);
}

int main(int argc, char **argv) {

    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (argc < 3) {
        fprintf(stderr,"Usage: %s localn k-to-select\n", argv[0]);
        MPI_Abort(MPI_COMM_WORLD,1);
    }

    int locn    = atoi(argv[1]);
    int selectk = atoi(argv[2]);
    globalselectk = selectk;     /* yuk */

    int localdata[locn];
    int local[selectk], global[selectk];

    /* create our new data type */
    MPI_Datatype mpi_datatype = createtype(selectk);

    MPI_Op mpi_selectop;
    MPI_Op_create(selectop, 1, &mpi_selectop);

    srand(rank*37);
    for (int i=0; i<locn; i++)
        localdata[i] = floor(500.*rand()/RAND_MAX);
    /* get our local k selected */
    /* could use quickselect for this, but to focus on the MPI, let's just sort */

    initselectk(local, selectk);
    qsort(localdata, locn, sizeof(int), intcompar);
    for (int i=0; i<min2(selectk,locn); i++)
        local[i] = localdata[i];

    for (int proc=0; proc<size; proc++) {
        if (rank == proc) {
            printf("Rank %2d has values: ",rank);
            for (int i=0; i<locn; i++)
                printf("%3d ", localdata[i]);
            printf("\n");
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    MPI_Reduce(local, global, 1, mpi_datatype, mpi_selectop, 0, MPI_COMM_WORLD);

    if (rank == 0) {
        printf("Result is: \n");
        printselectk(global,selectk);
        printf("\n");
    }

    MPI_Op_free(&mpi_selectop);
    MPI_Type_free(&mpi_datatype);
    MPI_Finalize();

    return 0;
}

Compiling and running gives:

$ mpicc kselect.c -o kselect -Wall -std=c99 
$ mpirun -np 10 kselect 12 5
Rank  0 has values:  98 138 167 197 238 276 314 384 391 399 420 455 
Rank  1 has values:  16  87 119 134 156 164 225 299 321 380 409 441 
Rank  2 has values:  22  81 155 219 285 295 330 342 364 399 435 499 
Rank  3 has values:   3   7  75 164 181 271 285 358 379 438 466 491 
Rank  4 has values:   7  63  74 132 173 178 197 244 304 337 352 457 
Rank  5 has values:  21  62 104 138 240 346 377 382 411 446 455 482 
Rank  6 has values:  19  90 142 231 246 269 281 307 331 380 413 451 
Rank  7 has values:  43 191 193 232 236 331 399 429 439 445 446 457 
Rank  8 has values:  10 111 128 165 277 277 371 394 413 438 443 470 
Rank  9 has values:   2   2  34  57  97 105 128 187 265 329 344 409 
Result is: 
[  2   2   3   7   7 ] 

(Version without global variable follows:)

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <limits.h>
#include <math.h>
#include <assert.h>

const int invalid_data = -1;

int min2(int a, int b) {
    if (b < a)
        return b;
    return a;
}

MPI_Datatype createtype(int selectk) {
    MPI_Datatype selectktype;
    MPI_Type_contiguous(selectk, MPI_INT, &selectktype);
    MPI_Type_commit(&selectktype);

    return selectktype;
}

void initselectk(int *d, int selectk) {
    d[0] = selectk;
    for (int i=1; i<selectk+1; i++)
        d[i] = invalid_data;
}

void printselectk(int *d) {
    int selectk = d[0];
    printf("[");
    for (int i=1; i<selectk+1; i++) 
        printf("%3d ",d[i]);
    printf("] ");
}

int countselectk(int *d) {
    int selectk = d[0];
    int count = 0;
    d++;

    while ( (d[count] != invalid_data) && (count < selectk) )
        count++;

    return count;
}

int mergeselect(int *d1, int *d2, int *dout) {
    int selectk = d1[0];
    assert(selectk == d2[0]);
    dout[0] = selectk;
    dout++;

    int count1 = countselectk(d1);
    int count2 = countselectk(d2);
    int total = count1 + count2;
    if (total >= selectk) total = selectk;

    int count = 0;
    int idx1=1, idx2=1;

    while (count < total) {
        int minloc = -1;
        int minval = INT_MAX;
        if (idx1 <= count1) {
            minloc = 1;
            minval = d1[idx1];
        }
        if ( (idx2 <= count2) && (d2[idx2] < minval ) ) {
            minloc = 2;
            minval = d2[idx2];
        } 
        dout[count++] = minval;
        if (minloc == 1)
            idx1++;
        else
            idx2++; 
    }
    return count;
}

void selectop(void *in, void *inout, int *len, MPI_Datatype *type) {
    int *invals = (int *)in;
    int *inoutvals = (int *)inout;


    for (int i=0; i<*len; i++) {
        int selectk = invals[0];
        assert(selectk == inoutvals[0]);

        int out[selectk+1];
        initselectk(out, selectk);
        int count = mergeselect(invals, inoutvals, out);
        for (int j=1; j<=count; j++) 
            inoutvals[j] = out[j];

        invals += selectk+1;
        inoutvals += selectk+1;
    }

    return;
}

int intcompar(const void *v1, const void *v2) {
    int *i1 = (int *)v1;
    int *i2 = (int *)v2;

    return (*i1 - *i2);
}

int main(int argc, char **argv) {

    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (argc < 3) {
        fprintf(stderr,"Usage: %s localn k-to-select\n", argv[0]);
        MPI_Abort(MPI_COMM_WORLD,1);
    }

    int locn    = atoi(argv[1]);
    int selectk = atoi(argv[2]);

    int localdata[locn];
    int local[selectk+1], global[selectk+1];

    /* create our new data type */
    MPI_Datatype mpi_datatype = createtype(selectk+1);

    MPI_Op mpi_selectop;
    MPI_Op_create(selectop, 1, &mpi_selectop);

    srand(rank*37);
    for (int i=0; i<locn; i++) 
        localdata[i] = floor(500.*rand()/RAND_MAX);

    /* get our local k selected */
    /* could use quickselect for this, but to focus on the MPI, let's just sort */

    initselectk(local, selectk);
    qsort(localdata, locn, sizeof(int), intcompar);
    for (int i=0; i<min2(selectk,locn); i++) 
        local[i+1] = localdata[i];

    for (int proc=0; proc<size; proc++) {
        if (rank == proc) {
            printf("Rank %2d has values: ",rank);
            for (int i=0; i<locn; i++)
                printf("%3d ", localdata[i]);
            printf("\n");
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    MPI_Reduce(local, global, 1, mpi_datatype, mpi_selectop, 0, MPI_COMM_WORLD);

    if (rank == 0) {
        printf("Result is: \n");
        printselectk(global);
        printf("\n");
    }

    MPI_Op_free(&mpi_selectop);
    MPI_Type_free(&mpi_datatype);
    MPI_Finalize();

    return 0;
}