Can I use the work-stealing behaviour of ForkJoinPool to avoid a thread starvation deadlock?
Asked Answered
B

3

18

A thread starvation deadlock occurs in a normal thread pool if all the threads in the pool are waiting for queued tasks in the same pool to complete. ForkJoinPool avoids this problem by stealing work from other threads from inside the join() call, rather than simply waiting. For example:

private static class ForkableTask extends RecursiveTask<Integer> {
    private final CyclicBarrier barrier;

    ForkableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    protected Integer compute() {
        try {
            barrier.await();
            return 1;
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
}

@Test
public void testForkJoinPool() throws Exception {
    final int parallelism = 4;
    final ForkJoinPool pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
    for (int i = 0; i < parallelism; ++i) {
        forkableTasks.add(new ForkableTask(barrier));
    }

    int result = pool.invoke(new RecursiveTask<Integer>() {
        @Override
        protected Integer compute() {
            for (ForkableTask task : forkableTasks) {
                task.fork();
            }

            int result = 0;
            for (ForkableTask task : forkableTasks) {
                result += task.join();
            }
            return result;
        }
    });
    assertThat(result, equalTo(parallelism));
}

But when using the ExecutorService interface to a ForkJoinPool, work-stealing doesn't seem to occur. For example:

private static class CallableTask implements Callable<Integer> {
    private final CyclicBarrier barrier;

    CallableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public Integer call() throws Exception {
        barrier.await();
        return 1;
    }
}

@Test
public void testWorkStealing() throws Exception {
    final int parallelism = 4;
    final ExecutorService pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
    int result = pool.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            int result = 0;
            // Deadlock in invokeAll(), rather than stealing work
            for (Future<Integer> future : pool.invokeAll(callableTasks)) {
                result += future.get();
            }
            return result;
        }
    }).get();
    assertThat(result, equalTo(parallelism));
}

From a cursory look at ForkJoinPool's implementation, all the regular ExecutorService APIs are implemented using ForkJoinTasks, so I'm not sure why a deadlock occurs.

Blastoff answered 26/10, 2014 at 18:11 Comment(2)
I don't think work stealing avoids deadlock. Once you're deadlocked that's it, you can't make progress. Work stealing just avoids unbalanced queues by allowing threads to steal from other queues if their queue is empty.Kwan
@Kwan In the implementation of ForkJoinTask, join() tries to run other jobs from the deque rather than stalling, which avoids the deadlock. Since ForkJoinPool.invokeAll() converts the Callables to ForkJoinTasks, I expected that to work too.Blastoff
D
42

You are almost answering your own question. The solution is the statement that "ForkJoinPool avoids this problem by stealing work from other threads from inside the join() call". Whenever the threads are blocked for some other reason except ForkJoinPool.join(), this work stealing does not occur, and the threads just waits and does nothing.

The reason for this is that in Java it is not possible for the ForkJoinPool to prevent its threads from blocking and instead give them something else to work on. The thread itself needs to avoid blocking and instead ask the pool for work it should do. And this is only implemented in the ForkJoinTask.join() method, not in any other blocking method. If you use a Future inside a ForkJoinPool, you will also see the starvation deadlock.

Why is work stealing only implemented in ForkJoinTask.join() and not in any other blocking methods in the Java API? Well, there are many of such blocking methods (Object.wait(), Future.get(), any of the concurrency primitives in java.util.concurrent, I/O methods etc), and they have nothing to do with ForkJoinPool, which is just an arbitrary class in the API, so adding special cases to all these methods would be bad design. It would also lead to possibly very surprising and undesired effects. Imagine for example a user passing a task to an ExecutorService that waits on a Future, and then finding out that the task hangs very long in Future.get() just because the running thread stole some other (long-running) work item instead of waiting for the Future and continuing immediately after the result is available. Once a thread starts working on another task, it cannot return to the original task until the second task is finished. Thus it is actually a good thing that other blocking methods do not do work stealing. For a ForkJoinTask, this problem does not exist, because it is not important that the primary task is continued as soon as possible, it is only important that all tasks together are handled as efficiently as possible.

It is also not possible to implement your own method for doing work stealing inside a ForkJoinPool, because all the relevant parts are not public.

However, there is actually a second method how starvation deadlocks can be prevented. This is called managed blocking. It does not use work stealing (to avoid the problem mentioned above), but also needs the thread that is going to be block to actively cooperate with the thread pool. With managed blocking, the thread tells the thread pool that it may be blocked before it calls the potentially blocking method, and also informs the pool when the blocking method is finished. The thread pool then knows that there is a risk of a starvation deadlock, and may spawn additional threads if all of its threads are currently in some blocking operation and there are still other tasks to execute. Note that this is less efficient than work stealing, because of the overhead of the additional threads. If you implement a recursive parallel algorithm with ordinary futures and managed blocking instead of with ForkJoinTask and work stealing, the number of additional threads can get very large (because in the "divide" phase of the algorithm, a lot of tasks will be created and given to threads that immediately block and wait for results from sub-tasks). However, a starvation deadlock is still prevented, and it avoids the problem that a task has to wait a long time because its thread started working on another task in the mean time.

The ForkJoinPool of Java also supports managed blocking. To use this, one needs to implement the interface ForkJoinPool.ManagedBlocker such that the potentially-blocking method that the task wants to execute is called from within the block method of this interface. Then the task may not call the blocking method directly, but instead needs to call the static method ForkJoinPool.managedBlock(ManagedBlocker). This method handles the communication with the thread pool before and after the blocking. It also works if the current task is not executed within a ForkJoinPool, then it just calls the blocking method.

The only place I have found in the Java API (for Java 7) that actually uses managed blocking is the class Phaser. (This class is a synchronization barrier like mutexes and latches, but more flexible and powerful.) So synchronizing with a Phaser inside a ForkJoinPool task should use managed blocking and can avoid starvation deadlocks (but ForkJoinTask.join() is still preferable because it uses work stealing instead of managed blocking). This works regardless of whether you use the ForkJoinPool directly or via its ExecutorService interface. However, it will not work if you use any other ExecutorService like those created by the class Executors, because these do not support managed blocking.

In Scala, the use of managed blocking is more widespread (description, API).

Dwt answered 28/10, 2014 at 20:6 Comment(8)
Thanks for this answer, very comprehensive. One nitpick though, the ForkJoinTask implementation does the same stealing in get() as it does in join(). The deadlock in my question mainly comes from attempting to synchronize without ForkJoinPool.managedBlock() (actually, both examples deadlock on Java 7). Using Phasers instead, both can be made to work.Blastoff
Perhaps you can clarify how does Java's thread's blocking state correspond to a blocking thread in the modern Scala sense. Does every blocking operation wait on a Java monitor lock as described on that link? or does the thread pool keep track of blocking state in some other way?Meade
@matt I am not sure what you mean by "blocking thread in the modern Scala sense". The connection between State.BLOCKED and managed blocking is only that a thread that knows that it might be in BLOCKED state soon should tell the ForkJoinPool about this upfront by calling managedBlock. The ForkJoinPool does not try to detect whether threads are currently blocked. And if a thread uses managedBlock but is not actually being BLOCKED the ForkJoinPool would still increment the number of threads.Dwt
Okay. Some descriptions implied ForkJoinPool may or may not spawn a new thread, so I wondered if it has some deeper wisdom for deciding. I guess according to you, not. In a modern sense some people refer to as "reactive programming", a long cpu intensive task can also be considered blocking, for a user facing application. That's what I meant. I think your answer also answers for that case. It's only concerning that there's no bounding to the number of threads other than OufOfMemoryError. That seems to spell trouble - why no upper bound?Meade
@matt As indicated in my answer, a ForkJoinPool is not meant for being "reactive" with regard to an individual task, it is meant for delivering the final result of all the tasks as soon as possible, and to achieve this it accepts to delay single tasks (when doing work stealing) for the greater good.Dwt
@matt ForkJoinPool does not guarantee to spawn a new thread as soon as managedBlock is called, it will do so only if necessary. Maybe that was meant by the descriptions you read.Dwt
@matt It seems ForkJoinPool honors its hard-coded upper limit of 32767 threads even in the case of managed blocking, so there is some limit (although quite high). A more appropriate limit would be hard to define: would you prefer a deadlock over an OutOfMemoryError? I guess the OOM is the less probable problem and easier to resolve and to debug.Dwt
Obviously the current limit is purely arbitrary other than implying an Integer is used somewhere behind the scenes. Frankly I would prefer the limit configurable, and that once reached there'd be a fallback to blocking rather than an exception, or a configurable property to choose between that fallback v.s. an exception.Meade
G
1

I see what you are doing but I don't know why. The idea of a barrier is so independent threads can wait for each other to reach a common point. You don't have independent threads. Thread pools, F/J, are for Data Parallelism

You are doing something more attuned to Task Parallelism

The reason F/J continues is that the framework creates "continuation threads" to continue fetching work from the deques when all work threads are waiting.

Goggleeyed answered 26/10, 2014 at 22:19 Comment(2)
The barriers are just there to make sure each task is scheduled on a separate thread. And I don't think "continuation threads" are the answer, if you print out Thread.currentThread().getId(), you'll see that one of the ForkableTasks runs in the same thread as the one that invokes the rest, and that only 4 threads are used in total.Blastoff
You cannot guarantee what thread processes what task with work-stealing. All tasks dump into the same submission queue. Depending on which release you use (Java7/8) work-threads that block are replaced with either "continuation" or "compensation" threads. What you are doing is not the forte of F/J (Data Parallelism.)Goggleeyed
L
1

You're confusing implementation details with contract guarantees. Where in the documentation do you find that join will steal work and prevent deadlock in your situation? Instead, the documentation says:

Method join() and its variants are appropriate for use only when completion dependencies are acyclic; that is, the parallel computation can be described as a directed acyclic graph (DAG). Otherwise, executions may encounter a form of deadlock as tasks cyclically wait for each other.

Your example is cyclic. The tasks calling barrier.await() are dependent on eachother.

The documentation further states:

However, this framework supports other methods and techniques (for example the use of Phaser, helpQuiesce(), and complete(V)) that may be of use in constructing custom subclasses for problems that are not statically structured as DAGs.

The documentation for Phaser states:

Phasers may also be used by tasks executing in a ForkJoinPool. Progress is ensured if the pool's parallelismLevel can accommodate the maximum number of simultaneously blocked parties.

This still isn't clear (because it's not explicit in describing the interaction with join), but it might mean that Phaser is designed to work as in your first example.

Lighterage answered 26/11, 2019 at 9:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.