Disclaimer: It's the first time I'm using Java's Fork-Join framework, so I'm not 100% sure I'm using it correctly. Java is also not my main programming language, so this could also be relevant.
Given the following SSCCE:
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch (InterruptedException ex) {
System.err.println(ex.getMessage());
System.exit(2);
}
System.out.println("Finished computing task with delay " + delayTask);
}
}
public final class ForkJoinBlocker
{
public static void main(String[] args)
{
ForkCalculator calculator = new ForkCalculator(
new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
);
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
);
pool.invoke(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run()
{
System.out.println(pool.toString());
}
},
100,
2000
);
}
}
So I create a ForkJoinPool
to which I submit some tasks which do some processing. I replaced them with Thread.sleep()
for the purposes of this example, to keep it simple.
In my actual program, this is a very long list of tasks, so I want to periodically print the current status on the standard-output. I try to do that on a separate thread using a scheduled TimerTask
.
However, I noticed something that I wasn't expecting: in my example the output is something like:
Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......
Which means the "status-task" is never executed.
However if I modify my code to move the pool.invoke(calculator);
at the very end, then it works as expected:
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......
The only conclusion I can draw is that ForkJoinPool::invoke()
blocks the main-thread (it only returns AFTER all the tasks in the pool are finished).
I expected the code in the main-thread to continue to be executed, while the tasks in the fork-join-pool are handled asynchronously.
My question is: does this happen because I used the framework incorrectly? Is there something I have to correct in my code?
I noticed one of ForkJoinPool
s constructors has a boolean asyncMode
parameter but, from what I can tell from the implementation, this is just to decide between FIFO_QUEUE
and LIFO_QUEUE
execution-modes (not exactly sure what those are):
public ForkJoinPool(
int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode
) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
invoke
? – Vanquishexecute()
instead? Oracle's own docs about this usesinvoke()
( docs.oracle.com/javase/tutorial/essential/concurrency/… ), which is why I didn't try changing it. – Sontag