Just looking over some notes prior to an interview and am struggling to understand how Odd-Even sort works in parallel architectures.
int MPI_OddEven_Sort(int n, double *a, int root, MPI_Comm comm)
{
int rank, size, i, sorted_result;
double *local_a;
// get rank and size of comm
MPI_Comm_rank(comm, &rank); //&rank = address of rank
MPI_Comm_size(comm, &size);
local_a = (double *) calloc(n / size, sizeof(double));
// scatter the array a to local_a
MPI_Scatter(a, n / size, MPI_DOUBLE, local_a, n / size, MPI_DOUBLE,
root, comm);
// sort local_a
merge_sort(n / size, local_a);
//odd-even part
for (i = 0; i < size; i++) {
if ((i + rank) % 2 == 0) { // means i and rank have same nature
if (rank < size - 1) {
MPI_Compare(n / size, local_a, rank, rank + 1, comm);
}
} else if (rank > 0) {
MPI_Compare(n / size, local_a, rank - 1, rank, comm);
}
MPI_Barrier(comm);
// test if array is sorted
MPI_Is_Sorted(n / size, local_a, root, comm, &sorted_result);
// is sorted gives integer 0 or 1, if 0 => array is sorted
if (sorted_result == 0) {
break;
} // check for iterations
}
// gather local_a to a
MPI_Gather(local_a, n / size, MPI_DOUBLE, a, n / size, MPI_DOUBLE,
root, comm)
return MPI_SUCCESS;
}
is some code I wrote for this function (not today nor yesterday!). Can someone please break down how it is working ?
I'm scattering my array a
to each processor, which is getting a copy of local_a
(which is of size n/size)
Merge sort is being called on each local_a.
What is going on after this? (Assuming I am correct so far!)
It's sort of fun to see these PRAM-type sorting networks popping up again after all these years. The original mental model of parallel computing for these things was massively parallel arrays of tiny processors as "comparators", eg the Connection Machines - back in the day when networking was cheap compared to CPU/RAM. Of course that ended up looking very different from the supercomputers of the mid to late 80s and on, and even more so than the x86 clusters of the late 90s on; but now they're starting to come back in vogue with GPUs and other accelerators which actually do look a bit like that future past if you squint.
It looks like what you have above is something more like a Baudet-Stevenson odd-even sort, which was already starting to move in the direction of assuming that the processors would have multiple items stored locally and you could make good use of the processors by sorting those local lists in between communication steps.
Fleshing out your code and simplifying it a bit, we have something like this:
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
int merge(double *ina, int lena, double *inb, int lenb, double *out) {
int i,j;
int outcount=0;
for (i=0,j=0; i<lena; i++) {
while ((inb[j] < ina[i]) && j < lenb) {
out[outcount++] = inb[j++];
}
out[outcount++] = ina[i];
}
while (j<lenb)
out[outcount++] = inb[j++];
return 0;
}
int domerge_sort(double *a, int start, int end, double *b) {
if ((end - start) <= 1) return 0;
int mid = (end+start)/2;
domerge_sort(a, start, mid, b);
domerge_sort(a, mid, end, b);
merge(&(a[start]), mid-start, &(a[mid]), end-mid, &(b[start]));
for (int i=start; i<end; i++)
a[i] = b[i];
return 0;
}
int merge_sort(int n, double *a) {
double b[n];
domerge_sort(a, 0, n, b);
return 0;
}
void printstat(int rank, int iter, char *txt, double *la, int n) {
printf("[%d] %s iter %d: <", rank, txt, iter);
for (int j=0; j<n-1; j++)
printf("%6.3lf,",la[j]);
printf("%6.3lf>\n", la[n-1]);
}
void MPI_Pairwise_Exchange(int localn, double *locala, int sendrank, int recvrank,
MPI_Comm comm) {
/*
* the sending rank just sends the data and waits for the results;
* the receiving rank receives it, sorts the combined data, and returns
* the correct half of the data.
*/
int rank;
double remote[localn];
double all[2*localn];
const int mergetag = 1;
const int sortedtag = 2;
MPI_Comm_rank(comm, &rank);
if (rank == sendrank) {
MPI_Send(locala, localn, MPI_DOUBLE, recvrank, mergetag, MPI_COMM_WORLD);
MPI_Recv(locala, localn, MPI_DOUBLE, recvrank, sortedtag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
} else {
MPI_Recv(remote, localn, MPI_DOUBLE, sendrank, mergetag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
merge(locala, localn, remote, localn, all);
int theirstart = 0, mystart = localn;
if (sendrank > rank) {
theirstart = localn;
mystart = 0;
}
MPI_Send(&(all[theirstart]), localn, MPI_DOUBLE, sendrank, sortedtag, MPI_COMM_WORLD);
for (int i=mystart; i<mystart+localn; i++)
locala[i-mystart] = all[i];
}
}
int MPI_OddEven_Sort(int n, double *a, int root, MPI_Comm comm)
{
int rank, size, i;
double *local_a;
// get rank and size of comm
MPI_Comm_rank(comm, &rank); //&rank = address of rank
MPI_Comm_size(comm, &size);
local_a = (double *) calloc(n / size, sizeof(double));
// scatter the array a to local_a
MPI_Scatter(a, n / size, MPI_DOUBLE, local_a, n / size, MPI_DOUBLE,
root, comm);
// sort local_a
merge_sort(n / size, local_a);
//odd-even part
for (i = 1; i <= size; i++) {
printstat(rank, i, "before", local_a, n/size);
if ((i + rank) % 2 == 0) { // means i and rank have same nature
if (rank < size - 1) {
MPI_Pairwise_Exchange(n / size, local_a, rank, rank + 1, comm);
}
} else if (rank > 0) {
MPI_Pairwise_Exchange(n / size, local_a, rank - 1, rank, comm);
}
}
printstat(rank, i-1, "after", local_a, n/size);
// gather local_a to a
MPI_Gather(local_a, n / size, MPI_DOUBLE, a, n / size, MPI_DOUBLE,
root, comm);
if (rank == root)
printstat(rank, i, " all done ", a, n);
return MPI_SUCCESS;
}
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
int n = argc-1;
double a[n];
for (int i=0; i<n; i++)
a[i] = atof(argv[i+1]);
MPI_OddEven_Sort(n, a, 0, MPI_COMM_WORLD);
MPI_Finalize();
return 0;
}
So the way this works is that the list is evenly split up between processors (non-equal distributions are easily handled too, but it's a lot of extra bookkeeping which doesn't add much to this discussion).
We first sorting our local lists (which is O(n/P ln n/P)). There's no reason it has to be a merge sort, of course, except that here we can re-use that merge code the following steps. Then we do P neighbour exchange steps, half in each direction. The model here was that there was a linear network where we could communicate directly and quickly with immediate neighbours, and perhaps not at all with neighbours further away.
The original odd-even sorting network is the case where each processor has one key, in which case the communication is easy - you compare your item with your neighbour, and swap if necessary (so that this is basically a parallel bubble sort). In this case, we do a simple parallel sort between pairs of processes - here, each pair just sends all data to one of the pair, that pair merges the already locally sorted lists O(N/P), and then gives the appropriate half of the data back to the other processor. I took out your check-if-done; it can be shown that it's completed in P neighbour exchanges. You can certainly add it back in just in case of early termination; however, all the processors have to agree when everything's done, which requires something like an all reduce, which breaks the original model somewhat.
So we have O(n) data transfer per link, (sending and receiving n/P items P times each), and each processor does (n/P ln n/P) + (2 n/P - 1)*P/2 = O(n/P ln n/P + N) comparisons; in this case there's a scatter and a gather to be considered as well, but in general this sort is done with data in place.
Running the above - with, for clarity, that same example in that document linked gives (with output re-ordered to make it easier to read):
$ mpirun -np 4 ./baudet-stevenson 43 54 63 28 79 81 32 47 84 17 25 49
[0] before iter 1: <43.000,54.000,63.000>
[1] before iter 1: <28.000,79.000,81.000>
[2] before iter 1: <32.000,47.000,84.000>
[3] before iter 1: <17.000,25.000,49.000>
[0] before iter 2: <43.000,54.000,63.000>
[1] before iter 2: <28.000,32.000,47.000>
[2] before iter 2: <79.000,81.000,84.000>
[3] before iter 2: <17.000,25.000,49.000>
[0] before iter 3: <28.000,32.000,43.000>
[1] before iter 3: <47.000,54.000,63.000>
[2] before iter 3: <17.000,25.000,49.000>
[3] before iter 3: <79.000,81.000,84.000>
[0] before iter 4: <28.000,32.000,43.000>
[1] before iter 4: <17.000,25.000,47.000>
[2] before iter 4: <49.000,54.000,63.000>
[3] before iter 4: <79.000,81.000,84.000>
[0] after iter 4: <17.000,25.000,28.000>
[1] after iter 4: <32.000,43.000,47.000>
[2] after iter 4: <49.000,54.000,63.000>
[3] after iter 4: <79.000,81.000,84.000>
[0] all done iter 5: <17.000,25.000,28.000,32.000,43.000,47.000,49.000,54.000,63.000,79.000,81.000,84.000>