Fork Join optimization

2019-02-17 02:34发布

问题:

What I want

I want to work on optimization of fork/join algorithm. By optimization I mean just calculation of optimal number of threads, or if you want - calculation of SEQUENTIAL_THRESHOLD (see code below).

// PSEUDOCODE
Result solve(Problem problem) { 
    if (problem.size < SEQUENTIAL_THRESHOLD)
        return solveSequentially(problem);
    else {
        Result left, right;
        INVOKE-IN-PARALLEL { 
            left = solve(extractLeftHalf(problem));
            right = solve(extractRightHalf(problem));
        }
        return combine(left, right);
    }
}

How do I imagine that

For example, I want to calculate the product of big array. Then I just evaluate all components and get the optimal threads amount:

SEQUENTIAL_THRESHOLD = PC * IS / MC (just example)

PC - number of processor cores; IS - constant, that indicates the optimal array size with one processor core and the simplest operation on data (for example reading); MC - multiply operation cost;

Suppose MC = 15; PC = 4 and IS = 10000; SEQUENTIAL_THRESHOLD = 2667. Than if subtask-array is bigger than 2667 I'll fork it.

Broad questions

  1. Is it possible to make SEQUENTIAL_THRESHOLD formula in such way?
  2. Is it possible to accomplish the same for more complex computation: not only for operations on arrays/collections and sorting?

Narrow question:

Do already exist some investigations about calculation of SEQUENTIAL_THRESHOLD for arrays/collections/sorting? How do they accomplish that?

Updated 07 March 2014:

  1. If there is no way to write a single formula for threshold calculation, can I write an util which will perform predefined tests on PC, and than gets the optimal threshold? Is that also impossible or not?
  2. What can Java 8 Streams API do? Can it help me? Does Java 8 Streams API eliminate a need in Fork/Join?

回答1:

There is absolutely, positively no way to calculate a proper threshold unless you are intimate with the execution environment. I maintain a fork/join project on sourceforge.net and this is the code I use in most built-in-functions:

private int calcThreshold(int nbr_elements, int passed_threshold) {

  // total threads in session
  // total elements in array
  int threads = getNbrThreads();
  int count   = nbr_elements + 1;

  // When only one thread, it doesn't pay to decompose the work,
  //   force the threshold over array length
  if  (threads == 1) return count;    

  /*
   * Whatever it takes
   * 
   */
  int threshold = passed_threshold;

  // When caller suggests a value
  if  (threshold > 0) {

      // just go with the caller's suggestion or do something with the suggestion

  } else {
      // do something usful such as using about 8 times as many tasks as threads or
      //   the default of 32k
      int temp = count / (threads << 3);
      threshold = (temp < 32768) ? 32768 : temp;

  } // endif    

  // whatever
  return threshold;

}

Edit on 9 March:

How can you possibly have a general utility that can know not only the processor speed, memory available, number of processors, etc. (the physical environment) but the intention of the software? The answer is you cannot. Which is why you need to develop a routine for each environment. The above method is what I use for basic arrays (vectors.) I use another for most matrix processing:

// When very small, just spread every row
if  (count < 6) return 1;

// When small, spread a little 
if  (count < 30) return ((count / (threads << 2) == 0)? threads : (count / (threads << 2)));  

// this works well for now
return ((count / (threads << 3) == 0)? threads : (count / (threads << 3))); 

As far as Java8 streams: They use the F/J framework under the hood and you cannot specify a threshold.



回答2:

You cannot boil this down to a simple formula for several reasons:

  • Each PC will have vastly different parameters depending not only on the core, but also on other factors like RAM timing or background tasks.

  • Java itself is optimizing loops on the fly during execution. So a momentary perfect setting could be sub-optimal a few seconds later. Or worse: the adjustment could prevent perfect optimization all together.

The only way to go that I can see is to dynamically adjust the values in some form of AI or genetic algorithm. However that includes that the program frequently checks non-optimal settings just to determine whether or not the current setting is still the best. So it is questionable if the speed gained is actually higher than the speed lost for trying other settings. In the end probably only a solution during an initial learning phase, while further executions then use those trained values as fixed numbers.

As this not only costs time but also greatly increases the code complexity, I don't think this is an option for most programs. Often it is more beneficial to not even use Fork-Join in the first place, as there are many other parallelization options that might better suit the problem.

An idea for a "genetic" algorithm would be to measure the loop efficiency each run, then have a background hash-map loop-parameters -> execution time that is constantly updated, and the fastest setting is selected for most of the runs.



回答3:

This is a very interesting problem to investigate. I have written this simple code to test the optimal value of sequential threshold. I was unable to reach any concrete conclusions though, most probably because I am running it on a old laptop with only 2 processors. The only consistent observation after many runs was that the time taken drops rapidly till sequential threshold of 100. Try running this code and let me know what you find. Also at the bottom I have attached a python script for plotting the results so that we can visually see the trend.

import java.io.FileWriter;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class Testing {

static int SEQ_THRESHOLD;

public static void main(String[] args) throws Exception {
    int size = 100000;
    int[] v1 = new int[size];
    int[] v2 = new int[size];
    int[] v3 = new int[size];
    for (int i = 0; i < size; i++) {
        v1[i] = i;  // Arbitrary initialization
        v2[i] = 2 * i; // Arbitrary initialization
    }
    FileWriter fileWriter = new FileWriter("OutTime.dat");

    // Increment SEQ_THRESHOLD and save time taken by the code to run in a file
    for (SEQ_THRESHOLD = 10; SEQ_THRESHOLD < size; SEQ_THRESHOLD += 50) {
        double avgTime = 0.0;
        int samples = 5;
        for (int i = 0; i < samples; i++) {
            long startTime = System.nanoTime();
            ForkJoinPool fjp = new ForkJoinPool();
            fjp.invoke(new VectorAddition(0, size, v1, v2, v3));
            long endTime = System.nanoTime();
            double secsTaken = (endTime - startTime) / 1.0e9;
            avgTime += secsTaken;
        }
        fileWriter.write(SEQ_THRESHOLD + " " + (avgTime / samples) + "\n");
    }

    fileWriter.close();
}
}

class VectorAddition extends RecursiveAction {

int[] v1, v2, v3;
int start, end;

VectorAddition(int start, int end, int[] v1, int[] v2, int[] v3) {
    this.start = start;
    this.end = end;
    this.v1 = v1;
    this.v2 = v2;
    this.v3 = v3;
}

int SEQ_THRESHOLD = Testing.SEQ_THRESHOLD;

@Override
protected void compute() {
    if (end - start < SEQ_THRESHOLD) {
        // Simple vector addition
        for (int i = start; i < end; i++) {
            v3[i] = v1[i] + v2[i];
        }
    } else {
        int mid = (start + end) / 2;
        invokeAll(new VectorAddition(start, mid, v1, v2, v3),
                new VectorAddition(mid, end, v1, v2, v3));
    }
}
}

and here is the Python script for plotting results:

from pylab import *

threshold = loadtxt("./OutTime.dat", delimiter=" ", usecols=(0,))
timeTaken = loadtxt("./OutTime.dat", delimiter=" ", usecols=(1,))

plot(threshold, timeTaken)
show()