How does newly introduced Arrays.parallelPrefix(…)

2020-05-20 06:57发布

问题:

I came across Arrays.parallelPrefix introduced in Java 8.

This overloaded method performs operation on each element of the input array in a cumulative fashion. For e.g. from docs:

Cumulates, in parallel, each element of the given array in place, using the supplied function. For example if the array initially holds [2, 1, 0, 3] and the operation performs addition, then upon return the array holds [2, 3, 3, 6]. Parallel prefix computation is usually more efficient than sequential loops for large arrays.

So, how does Java achieve this task in parallel when the operation on a term is dependent on the operation result on the previous term, and so on?

I tried going through the code myself and they do use ForkJoinTasks, but it's not so straightforward how they would merge result to get the final array.

回答1:

As explained in Eran’s answer, this operation utilizes the associativity property of the function.

Then, there are two fundamental steps. The first one, is an actual prefix operation (in the sense of requiring the previous element(s) for the evaluation), applied to parts of the array in parallel. The result of each partial operation (identical to the resulting last element), is the offset for the remaining array.

E.g. for the following array, using sum as prefix operation, and four processors

  4    9    5    1    0    5    1    6    6    4    6    5    1    6    9    3  

we get

  4 → 13 → 18 → 19    0 →  5 →  6 → 12    6 → 10 → 16 → 21    1 →  7 → 16 → 19  
                 ↓                   ↓                   ↓                   ↓  
                19                  12                  21                  19  

now, we utilize the associativity to apply the prefix operation to the offsets first

                 ↓                   ↓                   ↓                   ↓  
                19         →        31         →        52         →        71  

Then, we get to the second phase, which is to apply these offsets to each element of the next chunk, which is a perfectly parallelizable operation, as there is no dependency to the previous element(s) anymore

                     19   19   19   19   31   31   31   31   52   52   52   52  
                      ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓  
  4   13   18   19   19   24   25   31   37   41   47   52   53   59   68   71  

When we use the same example for eight threads,

  4    9    5    1    0    5    1    6    6    4    6    5    1    6    9    3  

  4 → 13    5 →  6    0 →  5    1 →  7    6 → 10    6 → 11    1 →  7    9 → 12  
       ↓         ↓         ↓         ↓         ↓         ↓         ↓         ↓  
      13         6         5         7        10        11         7        12  

       ↓         ↓         ↓         ↓         ↓         ↓         ↓         ↓  
      13    →   19    →   24    →   31    →   41    →   52    →   59    →   71  

           13   13   19   19   24   24   31   31   41   41   52   52   59   59  
            ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓  
  4   13   18   19   19   24   25   31   37   41   47   52   53   59   68   71  

we see that there will be a clear benefit, even when we use the simpler strategy of keeping the work chunks the same for both steps, in other words, accept one idle worker thread in the second phase. We will need about ⅛n for the first phase and ⅛n for the second, needing ¼n total for the operation (where n is the cost of the sequential prefix evaluation of the entire array). Of course, only roughly and in the best case.

In contrast, when we have only two processors

  4    9    5    1    0    5    1    6    6    4    6    5    1    6    9    3  


  4 → 13 → 18 → 19 → 19 → 24 → 25 → 31    6 → 10 → 16 → 21 → 22 → 28 → 37 → 40  
                                     ↓                                       ↓  
                                    31                                      40  

                                     ↓                                       ↓  
                                    31                   →                  71  

                                         31   31   31   31   31   31   31   31  
                                          ↓    ↓    ↓    ↓    ↓    ↓    ↓    ↓  
  4   13   18   19   19   24   25   31   37   41   47   52   53   59   68   71  

we can only gain a benefit, when we re-assign the work of the second phase. This is possible, as said, because the second phase’s work has no dependencies between the elements anymore. So we can split this operation arbitrarily, though it complicates the implementation and may introduce an additional overhead.

When we split the work of the second phase between both processors, the first phase needs about ½n and the second will need ¼n, yielding ¾n total, which still is a benefit, if the array is large enough.

As an additional note, you might notice that the offsets calculated in preparation of the second phase are identical to the result for the last element of the chunk. So, you could reduce the required number of operations by one per chunk by simply assigning that value. But the typical scenario is to have only a few chunks (scaling with the number of processors) with a large number of elements, so saving one operation per chunk is not relevant.



回答2:

The main point is that the operator is a

side-effect-free, associative function

This means that

(a op b) op c == a op (b op c)

Therefore, if you split the array into two halves and apply the parallelPrefix method recursively on each half, you can later merge the partial results by applying the operation on each element of the second half of the array with the last element of the first half.

Consider the [2, 1, 0, 3] with addition example. If you split the array into two halves and perform the operation on each half, you get:

[2, 3]    and    [0, 3]

Then, in order to merge them, you add 3 (the last element of the first half) to each element of the second half, and get:

[2, 3, 3, 6]

EDIT: This answer suggests one way of computing the prefixes of an array in parallel. It's not necessarily the most efficient way, and not necessarily the way used by the JDK implementation. You can further read about parallel algorithms for solving that problem here.



回答3:

I read both the answers and still could not understand completely how this is done, so decided to draw an example instead. Here is what I came up with, suppose this is the array that we start with (with 3 CPU's):

7, 9, 6, 1, 8, 7, 3, 4, 9

So each of the 3 threads will get it's chunk to work on:

Thread 1:  7, 9, 6
Thread 2:  1, 8, 7
Thread 3:  3, 4, 9

Since the documentation mandates an associative function, we can compute the sum in the first Thread and some partial sums in the ones ones, and when the first is known - all of them will. Let's see what 7, 9, 6 would become:

7, 9, 6  -> 7, 16, 22

So the sum in the first Thread is 22 - but other threads have no idea about that yet, so what they do instead is work against that as an x for example. Thus Thread 2, will be :

1, 8, 7 -> 1 (+x), 9 (+x), 16(+x) 

Thus the sum from the second Thread would be x + 16, thus in Thread 3, we would have:

3, 4, 9 -> 3 (+ x + 16), 7 (+ x + 16), 16 (+ x + 16)

3, 4, 9 -> x + 19, x + 23, x + 32

This way, as soon as I know x, I know all other results too.

Disclaimer: I am not sure this is how it is implemented (and I tried looking at the code - but it's too complicated).