A thread starvation deadlock occurs in a normal thread pool if all the threads in the pool are waiting for queued tasks in the same pool to complete. ForkJoinPool
avoids this problem by stealing work from other threads from inside the join()
call, rather than simply waiting. For example:
private static class ForkableTask extends RecursiveTask<Integer> {
private final CyclicBarrier barrier;
ForkableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
protected Integer compute() {
try {
barrier.await();
return 1;
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testForkJoinPool() throws Exception {
final int parallelism = 4;
final ForkJoinPool pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
for (int i = 0; i < parallelism; ++i) {
forkableTasks.add(new ForkableTask(barrier));
}
int result = pool.invoke(new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
for (ForkableTask task : forkableTasks) {
task.fork();
}
int result = 0;
for (ForkableTask task : forkableTasks) {
result += task.join();
}
return result;
}
});
assertThat(result, equalTo(parallelism));
}
But when using the ExecutorService
interface to a ForkJoinPool
, work-stealing doesn't seem to occur. For example:
private static class CallableTask implements Callable<Integer> {
private final CyclicBarrier barrier;
CallableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public Integer call() throws Exception {
barrier.await();
return 1;
}
}
@Test
public void testWorkStealing() throws Exception {
final int parallelism = 4;
final ExecutorService pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
int result = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
// Deadlock in invokeAll(), rather than stealing work
for (Future<Integer> future : pool.invokeAll(callableTasks)) {
result += future.get();
}
return result;
}
}).get();
assertThat(result, equalTo(parallelism));
}
From a cursory look at ForkJoinPool
's implementation, all the regular ExecutorService
APIs are implemented using ForkJoinTask
s, so I'm not sure why a deadlock occurs.
ForkJoinTask
,join()
tries to run other jobs from the deque rather than stalling, which avoids the deadlock. SinceForkJoinPool.invokeAll()
converts theCallable
s toForkJoinTask
s, I expected that to work too. – Blastoff