Fork Join optimization
Asked Answered
C

3

7

What I want

I want to work on optimization of fork/join algorithm. By optimization I mean just calculation of optimal number of threads, or if you want - calculation of SEQUENTIAL_THRESHOLD (see code below).

// PSEUDOCODE
Result solve(Problem problem) { 
    if (problem.size < SEQUENTIAL_THRESHOLD)
        return solveSequentially(problem);
    else {
        Result left, right;
        INVOKE-IN-PARALLEL { 
            left = solve(extractLeftHalf(problem));
            right = solve(extractRightHalf(problem));
        }
        return combine(left, right);
    }
}

How do I imagine that

For example, I want to calculate the product of big array. Then I just evaluate all components and get the optimal threads amount:

SEQUENTIAL_THRESHOLD = PC * IS / MC (just example)

PC - number of processor cores; IS - constant, that indicates the optimal array size with one processor core and the simplest operation on data (for example reading); MC - multiply operation cost;

Suppose MC = 15; PC = 4 and IS = 10000; SEQUENTIAL_THRESHOLD = 2667. Than if subtask-array is bigger than 2667 I'll fork it.

Broad questions

  1. Is it possible to make SEQUENTIAL_THRESHOLD formula in such way?
  2. Is it possible to accomplish the same for more complex computation: not only for operations on arrays/collections and sorting?

Narrow question:

Do already exist some investigations about calculation of SEQUENTIAL_THRESHOLD for arrays/collections/sorting? How do they accomplish that?

Updated 07 March 2014:

  1. If there is no way to write a single formula for threshold calculation, can I write an util which will perform predefined tests on PC, and than gets the optimal threshold? Is that also impossible or not?
  2. What can Java 8 Streams API do? Can it help me? Does Java 8 Streams API eliminate a need in Fork/Join?
Critique answered 5/3, 2014 at 7:32 Comment(4)
"The ideal threshold for choosing between sequential and parallel execution is a function of the cost of coordinating the parallel tasks. If coordination costs are zero, a larger number of finer-grained tasks tend to offer better parallelism; the lower the coordination costs, the finer-grained we can go before we need to switch to a sequential approach." - Quote from the website where the pseudocode is from: ibm.com/developerworks/library/j-jtp11137 ;)Running
In other words, don’t use the number of CPU cores as the number of available cores can change at any time. The threshold should just be large enough so that the overhead of splitting does not matter compared to the problem size. The more subtasks the more freedom for the Executor/scheduler to adapt to the real system’s load situation.Koblenz
This is a typical question for map-reduce tasks. You should check out the Streams API of Java 8. It takes care of the near optimal execution. You should not be concerned too much about reaching the optimum as long as you're not doing HPC. You want your desktop remain responsible if the computation happens there, even if it takes a little longer.Clouded
@Clouded I'm asking about HPC :) More details about Streams API please. Docs doesn't provide enough information about how does Java 8 parallelism optimize itself.Critique
F
5

There is absolutely, positively no way to calculate a proper threshold unless you are intimate with the execution environment. I maintain a fork/join project on sourceforge.net and this is the code I use in most built-in-functions:

private int calcThreshold(int nbr_elements, int passed_threshold) {

  // total threads in session
  // total elements in array
  int threads = getNbrThreads();
  int count   = nbr_elements + 1;

  // When only one thread, it doesn't pay to decompose the work,
  //   force the threshold over array length
  if  (threads == 1) return count;    

  /*
   * Whatever it takes
   * 
   */
  int threshold = passed_threshold;

  // When caller suggests a value
  if  (threshold > 0) {

      // just go with the caller's suggestion or do something with the suggestion

  } else {
      // do something usful such as using about 8 times as many tasks as threads or
      //   the default of 32k
      int temp = count / (threads << 3);
      threshold = (temp < 32768) ? 32768 : temp;

  } // endif    

  // whatever
  return threshold;

}

Edit on 9 March:

How can you possibly have a general utility that can know not only the processor speed, memory available, number of processors, etc. (the physical environment) but the intention of the software? The answer is you cannot. Which is why you need to develop a routine for each environment. The above method is what I use for basic arrays (vectors.) I use another for most matrix processing:

// When very small, just spread every row
if  (count < 6) return 1;

// When small, spread a little 
if  (count < 30) return ((count / (threads << 2) == 0)? threads : (count / (threads << 2)));  

// this works well for now
return ((count / (threads << 3) == 0)? threads : (count / (threads << 3))); 

As far as Java8 streams: They use the F/J framework under the hood and you cannot specify a threshold.

Forging answered 5/3, 2014 at 15:22 Comment(0)
F
3

You cannot boil this down to a simple formula for several reasons:

  • Each PC will have vastly different parameters depending not only on the core, but also on other factors like RAM timing or background tasks.

  • Java itself is optimizing loops on the fly during execution. So a momentary perfect setting could be sub-optimal a few seconds later. Or worse: the adjustment could prevent perfect optimization all together.

The only way to go that I can see is to dynamically adjust the values in some form of AI or genetic algorithm. However that includes that the program frequently checks non-optimal settings just to determine whether or not the current setting is still the best. So it is questionable if the speed gained is actually higher than the speed lost for trying other settings. In the end probably only a solution during an initial learning phase, while further executions then use those trained values as fixed numbers.

As this not only costs time but also greatly increases the code complexity, I don't think this is an option for most programs. Often it is more beneficial to not even use Fork-Join in the first place, as there are many other parallelization options that might better suit the problem.

An idea for a "genetic" algorithm would be to measure the loop efficiency each run, then have a background hash-map loop-parameters -> execution time that is constantly updated, and the fastest setting is selected for most of the runs.

Fari answered 5/3, 2014 at 15:16 Comment(4)
yes, I can make something like loop-parameters -> execution time. But than I should calculate that table for each PC configuration, it's impossible. How can I link all that with PC configs or maybe wich parameters should I consider?Critique
And is there any services with 16+ cores where I can test performance of my fork/join algorithms?Critique
You cannot pre-calculate those parameters, you need to do it on the actual machine. So either have something like a calibration phase added to your code to determine those parameters, or - if you only have a small set of machines - do some measurements by hand before the actual execution.Fari
look at updated version of my question please. Also I'm interresting to make something universal, not for determined range of machinesCritique
A
1

This is a very interesting problem to investigate. I have written this simple code to test the optimal value of sequential threshold. I was unable to reach any concrete conclusions though, most probably because I am running it on a old laptop with only 2 processors. The only consistent observation after many runs was that the time taken drops rapidly till sequential threshold of 100. Try running this code and let me know what you find. Also at the bottom I have attached a python script for plotting the results so that we can visually see the trend.

import java.io.FileWriter;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class Testing {

static int SEQ_THRESHOLD;

public static void main(String[] args) throws Exception {
    int size = 100000;
    int[] v1 = new int[size];
    int[] v2 = new int[size];
    int[] v3 = new int[size];
    for (int i = 0; i < size; i++) {
        v1[i] = i;  // Arbitrary initialization
        v2[i] = 2 * i; // Arbitrary initialization
    }
    FileWriter fileWriter = new FileWriter("OutTime.dat");

    // Increment SEQ_THRESHOLD and save time taken by the code to run in a file
    for (SEQ_THRESHOLD = 10; SEQ_THRESHOLD < size; SEQ_THRESHOLD += 50) {
        double avgTime = 0.0;
        int samples = 5;
        for (int i = 0; i < samples; i++) {
            long startTime = System.nanoTime();
            ForkJoinPool fjp = new ForkJoinPool();
            fjp.invoke(new VectorAddition(0, size, v1, v2, v3));
            long endTime = System.nanoTime();
            double secsTaken = (endTime - startTime) / 1.0e9;
            avgTime += secsTaken;
        }
        fileWriter.write(SEQ_THRESHOLD + " " + (avgTime / samples) + "\n");
    }

    fileWriter.close();
}
}

class VectorAddition extends RecursiveAction {

int[] v1, v2, v3;
int start, end;

VectorAddition(int start, int end, int[] v1, int[] v2, int[] v3) {
    this.start = start;
    this.end = end;
    this.v1 = v1;
    this.v2 = v2;
    this.v3 = v3;
}

int SEQ_THRESHOLD = Testing.SEQ_THRESHOLD;

@Override
protected void compute() {
    if (end - start < SEQ_THRESHOLD) {
        // Simple vector addition
        for (int i = start; i < end; i++) {
            v3[i] = v1[i] + v2[i];
        }
    } else {
        int mid = (start + end) / 2;
        invokeAll(new VectorAddition(start, mid, v1, v2, v3),
                new VectorAddition(mid, end, v1, v2, v3));
    }
}
}

and here is the Python script for plotting results:

from pylab import *

threshold = loadtxt("./OutTime.dat", delimiter=" ", usecols=(0,))
timeTaken = loadtxt("./OutTime.dat", delimiter=" ", usecols=(1,))

plot(threshold, timeTaken)
show()
Ankle answered 5/3, 2014 at 10:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.