How is the fork/join framework better than a thread pool?
Asked Answered
A

11

154

What are the benefits of using the new fork/join framework over just simply splitting the big task into N subtasks in the beginning, sending them to a cached thread pool (from Executors) and waiting for each task to complete? I fail to see how using the fork/join abstraction simplifies the problem or makes the solution more efficient from what we've had for years now.

For example, the parallelized blurring algorithm in the tutorial example could be implemented like this:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}

Split in the beginning and send tasks to a thread pool:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!

The tasks go to the thread pool's queue, from which they're executed as worker threads become available. As long as the splitting is granular enough (to avoid having to particularly wait for the last task) and the thread pool has enough (at least N of processors) threads, all processors are working at full speed until the whole computation is done.

Am I missing something? What's the added value of using the fork/join framework?

Amoreta answered 28/10, 2011 at 8:31 Comment(0)
F
151

I think the basic misunderstanding is, that the Fork/Join examples do NOT show work stealing but only some kind of standard divide and conquer.

Work stealing would be like this: Worker B has finished his work. He is a kind one, so he looks around and sees Worker A still working very hard. He strolls over and asks: "Hey lad, I could give you a hand." A replies. "Cool, I have this task of 1000 units. So far I have finished 345 leaving 655. Could you please work on number 673 to 1000, I'll do the 346 to 672." B says "OK, let's start so we can go to the pub earlier."

You see - the workers must communicate between each other even when they started the real work. This is the missing part in the examples.

The examples on the other hand show only something like "use subcontractors":

Worker A: "Dang, I have 1000 units of work. Too much for me. I'll do 500 myself and subcontract 500 to someone else." This goes on until the big task is broken down into small packets of 10 units each. These will be executed by the available workers. But if one packet is a kind of poison pill and takes considerably longer than other packets -- bad luck, the divide phase is over.

The only remaining difference between Fork/Join and splitting the task upfront is this: When splitting upfront you have the work queue full right from start. Example: 1000 units, the threshold is 10, so the queue has 100 entries. These packets are distributed to the threadpool members.

Fork/Join is more complex and tries to keep the number of packets in the queue smaller:

  • Step 1: Put one packet containing (1...1000) into queue
  • Step 2: One worker pops the packet(1...1000) and replaces it with two packets: (1...500) and (501...1000).
  • Step 3: One worker pops packet (500...1000) and pushes (500...750) and (751...1000).
  • Step n: The stack contains these packets: (1..500), (500...750), (750...875)... (991..1000)
  • Step n+1: Packet (991..1000) is popped and executed
  • Step n+2: Packet (981..990) is popped and executed
  • Step n+3: Packet (961..980) is popped and split into (961...970) and (971..980). ....

You see: in Fork/Join the queue is smaller (6 in the example) and the "split" and "work" phases are interleaved.

When multiple workers are popping and pushing simultaneously the interactions are not so clear of course.

Fatally answered 28/10, 2011 at 11:49 Comment(5)
I think this is indeed the answer. I wonder if there are actual Fork/Join examples anywhere that would demonstrate also its work stealing capabilities? With elementary examples the amount of workload is pretty perfectly predictable from the unit's size (e.g. array length) so upfront splitting is easy. Stealing would certainly make difference in problems where the amount of workload per unit is not well predictable from the unit's size.Amoreta
A.H. If your answer is correct, it doesn't explain how. The example given by Oracle does not result in work stealing. How would fork and join work as in the example you are describing here ? Could you show some Java code which would make fork and join steal work the way you describe it ? thanksMidian
@Marc: I'm sorry, but I have no example available.Fatally
The problem with Oracle's example, IMO, is not that it doesn't demonstrate work stealing (it does, as described by A.H.) but that it is easy to code an algorithm for a simple ThreadPool which does as well (as Joonas did). F-J is most useful when the work cannot be pre-split into enough independant tasks but can be recursively be split into tasks which are independant amongst themselves. See my answer for an exampleDarlleen
Some examples of where work stealing might come in handy: h-online.com/developer/features/…Loadstar
P
28

If you have n busy threads all working away at 100% independently, that's going to be better than n threads in a Fork-Join (FJ) pool. But it never works out that way.

There might not be able to precisely split the problem into n equal pieces. Even if you do, thread scheduling is some way off being fair. You'll end up waiting for the slowest thread. If you have multiple task then they can each run with less than n-way parallelism (generally more efficient), yet go up to n-way when other tasks have finished.

So why don't we just cut the problem up into FJ-size pieces and have a thread pool work on that. Typical FJ usage cuts the problem into tiny pieces. Doing these in a random order requires much co-ordination at a hardware level. The overheads would be a killer. In FJ, tasks are put onto a queue that the thread reads off in Last In First Out order (LIFO/stack), and work stealing (in core work, generally) is done First In First Out (FIFO/"queue"). The result is that long array processing can be done largely sequentially, even though it is broken into tiny chunks. (It is also the case that it might not be trivial to break the problem up into small evenly sized chunks in one big bang. Say dealing with a some form of hierarchy without balancing.)

Conclusion: FJ allows more efficient use of hardware threads in an uneven situations, which will be always if you have more than one thread.

Pitman answered 28/10, 2011 at 8:45 Comment(3)
But why wouldn't FJ end up waiting for the slowest thread too? There's a predeterministic number of subtasks, and of course some of them will always be the last one to complete. Adjusting the maxSize parameter in my example would produce almost similar subtask division as the "binary splitting" in the FJ example (done within the compute() method, which either computes something or sends subtasks to invokeAll()).Amoreta
Because they are much smaller - I'll add to my answer.Pitman
Ok, if the number of subtasks is order of magnitude(s) larger than what can be actually processed in parallel (which makes sense, to avoid having to wait for the last one), then the I can see the coordination issues. The FJ example may be misleading if the division is supposed to be that granular: it uses a threshold of 100000, which for a 1000x1000 image would produce 16 actual subtasks, each processing 62500 elements. For a 10000x10000 image there would be 1024 subtasks, which is already something.Amoreta
M
26

The ultimate goal of thread pools and Fork/Join are alike: Both want to utilize the available CPU power the best they can for maximum throughput. Maximum throughput means that as many tasks as possible should be completed in a long period of time. What is needed to do that? (For the following we will assume that there is no shortage of calculation tasks: There is always enough to do for 100% CPU utilisation. Additionally I use "CPU" equivalently for cores or virtual cores in case of hyper-threading).

  1. At least there need to be as many threads running as there are CPUs available, because running less threads will leave a core unused.
  2. At maximum there must be as many threads running as there are CPUs available, because running more threads will create additional load for the Scheduler who assigns CPUs to the different threads which causes some CPU time to go to the scheduler rather than our computational task.

Thus we figured out that for maximum throughput we need to have the exact same number of threads than CPUs. In Oracle's blurring example you can both take a fixed size thread pool with the number of threads equal to the number of available CPUs or use a thread pool. It won't make a difference, you are right!

So when will you get into trouble with a thread pools? That is if a thread blocks, because your thread is waiting for another task to complete. Assume the following example:

class AbcAlgorithm implements Runnable {
    public void run() {
        Future<StepAResult> aFuture = threadPool.submit(new ATask());
        StepBResult bResult = stepB();
        StepAResult aResult = aFuture.get();
        stepC(aResult, bResult);
    }
}

What we see here is an algorithm that consists of three steps A, B and C. A and B can be performed independently of each other, but step C needs the result of step A AND B. What this algorithm does is submit task A to the threadpool and perform task b directly. After that the thread will wait for task A to be done as well and continue with step C. If A and B are completed at the same time, then everything is fine. But what if A takes longer than B? That may be because the nature of task A dictates it, but it may also be the case because there is not thread for task A available in the beginning and task A needs to wait. (If there is only a single CPU available and thus your threadpool has only a single thread this will even cause a deadlock, but for now that is besides the point). The point is that the thread that just executed task B blocks the whole thread. Since we have the same number of threads as CPUs and one thread is blocked that means that one CPU is idle.

Fork/Join solves this problem: In the fork/join framework you'd write the same algorithm as follows:

class AbcAlgorithm implements Runnable {
    public void run() {
        ATask aTask = new ATask());
        aTask.fork();
        StepBResult bResult = stepB();
        StepAResult aResult = aTask.join();
        stepC(aResult, bResult);
    }
}

Looks the same, does it not? However the clue is that aTask.join will not block. Instead here is where work-stealing comes into play: The thread will look around for other tasks that have been forked in the past and will continue with those. First it checks whether the tasks it has forked itself have started processing. So if A has not been started by another thread yet, it will do A next, otherwise it will check the queue of other threads and steal their work. Once this other task of another thread has completed it will check whether A is completed now. If it is the above algorithm can call stepC. Otherwise it will look for yet another task to steal. Thus fork/join pools can achieve 100% CPU utilisation, even in the face of blocking actions.

However there is a trap: Work-stealing is only possible for the join call of ForkJoinTasks. It cannot be done for external blocking actions like waiting for another thread or waiting for an I/O action. So what about that, waiting for I/O to complete is a common task? In this case if we could add an additional thread to Fork/Join pool that will be stopped again as soon as the blocking action has completed will be the second best thing to do. And the ForkJoinPool can actually do just that if we are using ManagedBlockers.

Fibonacci

In the JavaDoc for RecursiveTask is an example for calculating Fibonacci numbers using Fork/Join. For a classic recursive solution see:

public static int fib(int n) {
    if (n <= 1) {
        return n;
    }
    return fib(n - 1) + fib(n - 2);
}

As is explained int the JavaDocs this is a pretty dump way to calculate fibonacci numbers, as this algorithm has O(2^n) complexity while simpler ways are possible. However this algorithm is very simple and easy to understand, so we stick with it. Let's assume we want to speed this up with Fork/Join. A naive implementation would look like this:

class Fibonacci extends RecursiveTask<Long> {
    private final long n;

    Fibonacci(long n) {
        this.n = n;
    }

    public Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
   }
}

The steps that this Task is split into are way too short and thus this will perform horribly, but you can see how the framework generally works very well: The two summands can be calculated independently, but then we need both of them to build the final result. So one half is done in an other thread. Have fun doing the same with thread pools without getting a deadlock (possible, but not nearly as simple).

Just for completeness: If you'd actually want to calculate Fibonacci numbers using this recursive approach here is an optimized version:

class FibonacciBigSubtasks extends RecursiveTask<Long> {
    private final long n;

    FibonacciBigSubtasks(long n) {
        this.n = n;
    }

    public Long compute() {
        return fib(n);
    }

    private long fib(long n) {
        if (n <= 1) {
            return 1;
        }
        if (n > 10 && getSurplusQueuedTaskCount() < 2) {
            final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
            final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
            f1.fork();
            return f2.compute() + f1.join();
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

This keeps the subtasks much smaller because they are only split when n > 10 && getSurplusQueuedTaskCount() < 2 is true, which means that there are significantly more than 100 method calls to do (n > 10) and there are not very man tasks already waiting (getSurplusQueuedTaskCount() < 2).

On my computer (4 core (8 when counting Hyper-threading), Intel(R) Core(TM) i7-2720QM CPU @ 2.20GHz) the fib(50) takes 64 seconds with the classic approach and just 18 seconds with the Fork/Join approach which is quite a noticeable gain, although not as much as theoretically possible.

Summary

  • Yes, in your example Fork/Join has no advantage over classic thread pools.
  • Fork/Join can drastically improve performance when blocking is involved
  • Fork/Join circumvents some deadlock problems
Meletius answered 16/5, 2016 at 15:33 Comment(0)
M
21

Fork/join is different from a thread pool because it implements work stealing. From Fork/Join

As with any ExecutorService, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.

Say you have two threads, and 4 tasks a, b, c, d which take 1, 1, 5 and 6 seconds respectively. Initially, a and b are assigned to thread 1 and c and d to thread 2. In a thread pool, this would take 11 seconds. With fork/join, thread 1 finishes and can steal work from thread 2, so task d would end up being executed by thread 1. Thread 1 executes a, b and d, thread 2 just c. Overall time: 8 seconds, not 11.

EDIT: As Joonas points out, tasks are not necessarily pre-allocated to a thread. The idea of fork/join is that a thread can choose to split a task into multiple sub-pieces. So to restate the above:

We have two tasks (ab) and (cd) which take 2 and 11 seconds respectively. Thread 1 starts to execute ab and split it into two sub-tasks a & b. Similarly with thread 2, it splits into two sub-tasks c & d. When thread 1 has finished a & b, it can steal d from thread 2.

Madrepore answered 28/10, 2011 at 9:46 Comment(7)
Thread pools are typically ThreadPoolExecutor instances. In such, tasks go a queue (BlockingQueue in practice), from which the worker threads take tasks as soon as they've finished their previous task. Tasks are not pre-assigned to specific threads, as far as I understand. Each thread has (at most) 1 task at a time.Amoreta
AFAIK there is one Queue for one ThreadPoolExecutor which in turn controls several Threads. This means that assigning tasks or Runnables (not Threads!) to an executor the tasks are also not preallocated to a specific Threads. Exactly way way FJ does it also. So far no benefit for using FJ.Fatally
@Fatally Yes, but fork/join allows you to split the current task. The thread that is executing the task can split it into two different tasks. So with the ThreadPoolExecutor you have a fixed list of tasks. With fork/join, the executing task can split it's own task into two, which can then be picked up by other threads when they've finished their work. Or you if you finish first.Madrepore
@Matthew Farwell: In the FJ example, within each task, compute() either computes the task, or splits it into two subtasks. Which option it chooses depends only on the task's size (if (mLength < sThreshold)...), so it's just a fancy way of creating a fixed number of tasks. For a 1000x1000 image, there will be exactly 16 subtasks that actually compute something. Additionally there will be 15 (= 16 - 1) "intermediate" tasks that only generate and invoke subtasks and don't compute anything themselves.Amoreta
@Joonas, yes, that is the strategy they've chosen for splitting. But they don't know how long each subtask will take. It's possible that one subtask will take 1 second and another (of the same 'size') will take 15 seconds. In this case, work stealing could happen. Maybe I'm misunderstanding what you're saying.Madrepore
@Matthew Farwell: It's possible that I don't understand all of FJ, but if a subtask has decided to execute its computeDirectly() method, there's no way to steal anything any more. The whole splitting is done a priori, at least in the example.Amoreta
@JoonasPulakka: I have written an answer which tries to address the things in this discussion.Fatally
L
15

In this example Fork/Join adds no value because forking is not needed and the workload is evenly split across worker threads. Fork/Join only adds overhead.

Here is a nice article on the subject. Quote:

Overall, we can say that the ThreadPoolExecutor is to be preferred where the workload is evenly split across worker threads. To be able to guarantee this, you do need to know precisely what the input data looks like. By contrast, the ForkJoinPool provides good performance irrespective of the input data and is thus a significantly more robust solution.

Loadstar answered 2/12, 2013 at 23:33 Comment(0)
M
14

Everyone above is correct the benefits are achieved by the work stealing, but to expand on why this is.

The primary benefit is the efficient coordination between worker threads. The work has to be split up and reassembled, which requires coordination. As you can see in A.H's answer above each thread has its own work list. An important property of this list is that it is sorted (large tasks at the top and small tasks at the bottom). Each thread executes the tasks at the bottom of its list and steals tasks from the top of other threads lists.

The result of this is:

  • The head and tail of the task lists can the synchronised independently, reducing contention on the list.
  • Significant subtrees of the work are split up and reassembled by the same thread, so no inter thread coordination is required for these subtrees.
  • When a thread steals work it takes a large piece which it then subdivides onto its own list
  • The work steeling means the threads are nearly fully utilised until the end of the process.

Most other divide and conquer schemes using thread pools require more inter-thread communication and coordination.

Mcfarland answered 22/6, 2012 at 14:11 Comment(0)
D
8

Another important difference seems to be that with F-J, you can do multiple, complex "Join" phases. Consider the merge sort from http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html, there would be too much orchestration required to pre-split this work. e.g. You need to do the following things:

  • sort the first quarter
  • sort the second quarter
  • merge the first 2 quarters
  • sort the third quarter
  • sort the forth quarter
  • merge the last 2 quarters
  • merge the 2 halves

How do you specify that you must do the sorts before the merges which concerns them etc.

I have been looking at how best to do a certain thing for each of a list of items. I think I will just pre-split the list and use a standard ThreadPool. F-J seems most useful when the work cannot be pre-split into enough independant tasks but can be recursively split into tasks which are independant amongst themselves (e.g. sorting the halves are independant but merging the 2 sorted halves into a sorted whole is not).

Darlleen answered 5/9, 2012 at 16:23 Comment(0)
O
6

F/J also has a distinct advantage when you have expensive merge operations. Because it splits into a tree structure you do only log2(n) merges as opposed to n merges with linear thread splitting. (This does make the theoretical assumption that you have as many processors as threads, but still an advantage) For a homework assignment we had to merge several-thousand 2D arrays (all the same dimensions) by summing the values at each index. With fork join and P processors the time approaches log2(n) as P approaches infinity.

1 2 3 .. 7 3 1 .... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9 .. 1 1 0 .... 8 9 9

Onetime answered 3/12, 2012 at 4:59 Comment(0)
A
4

I would like to add a short answer for those who don't have much time to read long answers. The comparison is taken from the book Applied Akka Patterns:

Your decision as to whether to use a fork-join-executor or a thread-pool-executor is largely based on whether the operations in that dispatcher will be blocking. A fork-join- executor gives you a maximum number of active threads, whereas a thread-pool-executor gives you a fixed number of threads. If threads are blocked, a fork-join-executor will create more, whereas a thread-pool-executor will not. For blocking operations, you are generally better off with a thread-pool-executor because it prevents your thread counts from exploding. More “reactive” operations are better in a fork-join-executor.

Aspectual answered 24/6, 2019 at 16:57 Comment(0)
P
3

You would be amazed on ForkJoin performance in application like crawler. here is the best tutorial you would learn from.

Fork/Join's logic is very simple: (1) separate (fork) each large task into smaller tasks; (2) process each task in a separate thread (separating those into even smaller tasks if necessary); (3) join the results.

Pantin answered 19/10, 2015 at 15:27 Comment(0)
B
3

If the problem is such that we have to wait for other threads to complete(as in case of sorting of array or sum of array), fork join should be used, as Executor(Executors.newFixedThreadPool(2)) will choke due to limited number of threads. The forkjoin pool will create more threads in this case to coverup for the blocked thread to maintain same parallelism

Source: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

The problem with the executors for implementing divide and conquer algorithms is not related to creating subtasks, because a Callable is free to submit a new subtask to its executor and wait for its result in a synchronous or asynchronous fashion. The issue is that of parallelism: When a Callable waits for the result of another Callable, it is put in a waiting state, thus wasting an opportunity to handle another Callable queued for execution.

The fork/join framework added to the java.util.concurrent package in Java SE 7 through Doug Lea’s efforts fills that gap

Source: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked IO or other unmanaged synchronization

public int getPoolSize() Returns the number of worker threads that have started but not yet terminated. The result returned by this method may differ from getParallelism() when threads are created to maintain parallelism when others are cooperatively blocked.

Bianco answered 20/4, 2016 at 23:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.