Why does ForkJoinPool::invoke() block the main thread?
Asked Answered
S

1

9

Disclaimer: It's the first time I'm using Java's Fork-Join framework, so I'm not 100% sure I'm using it correctly. Java is also not my main programming language, so this could also be relevant.


Given the following SSCCE:

import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;

class ForkCalculator extends RecursiveAction
{
    private final Integer[] delayTasks;

    public ForkCalculator(Integer[] delayTasks)
    {
        this.delayTasks = delayTasks;
    }

    @Override
    protected void compute()
    {
        if (this.delayTasks.length == 1) {
            this.computeDirectly();
            return;
        }

        Integer halfway = this.delayTasks.length / 2;

        ForkJoinTask.invokeAll(
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, 0, halfway)
            ),
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
            )
        );
    }

    private void computeDirectly()
    {
        Integer delayTask = this.delayTasks[0];

        try {
            Thread.sleep(delayTask);
        } catch (InterruptedException ex) {
            System.err.println(ex.getMessage());
            System.exit(2);
        }

        System.out.println("Finished computing task with delay " + delayTask);
    }
}

public final class ForkJoinBlocker
{
    public static void main(String[] args)
    {
        ForkCalculator calculator = new ForkCalculator(
            new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
        );

        ForkJoinPool pool = new ForkJoinPool(
            Runtime.getRuntime().availableProcessors()
        );

        pool.invoke(calculator);

        //make it a daemon thread
        Timer timer = new Timer(true);

        timer.scheduleAtFixedRate(
            new TimerTask() {
                @Override
                public void run()
                {
                    System.out.println(pool.toString());
                }
            },
            100,
            2000
        );
    }
}

So I create a ForkJoinPool to which I submit some tasks which do some processing. I replaced them with Thread.sleep() for the purposes of this example, to keep it simple.

In my actual program, this is a very long list of tasks, so I want to periodically print the current status on the standard-output. I try to do that on a separate thread using a scheduled TimerTask.

However, I noticed something that I wasn't expecting: in my example the output is something like:

Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......

Which means the "status-task" is never executed.

However if I modify my code to move the pool.invoke(calculator); at the very end, then it works as expected:

java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......

The only conclusion I can draw is that ForkJoinPool::invoke() blocks the main-thread (it only returns AFTER all the tasks in the pool are finished).

I expected the code in the main-thread to continue to be executed, while the tasks in the fork-join-pool are handled asynchronously.

My question is: does this happen because I used the framework incorrectly? Is there something I have to correct in my code?

I noticed one of ForkJoinPools constructors has a boolean asyncMode parameter but, from what I can tell from the implementation, this is just to decide between FIFO_QUEUE and LIFO_QUEUE execution-modes (not exactly sure what those are):

public ForkJoinPool(
    int parallelism,
    ForkJoinWorkerThreadFactory factory,
    UncaughtExceptionHandler handler,
    boolean asyncMode
) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
Sontag answered 1/10, 2018 at 13:2 Comment(6)
Did you read the docs for invoke?Vanquish
@Vanquish I did... any particular hint you are inclining towards?Autotransformer
Are you saying I should use execute() instead? Oracle's own docs about this uses invoke() ( docs.oracle.com/javase/tutorial/essential/concurrency/… ), which is why I didn't try changing it.Sontag
The first part: "Performs the given task, returning its result upon completion." It says right there it doesn't return until the tasks are complete.Vanquish
for someone fairly new to java, this is a very good question.. definitely have my upvoteAutotransformer
you btw could accept this correct answer here...Autotransformer
P
7

Basically invoke() will wait for the entire task to finish before returning, so yes the main thread is blocking. After that, the Timer doesn't have time to execute because it runs on a daemon thread.

You can simply use execute() instead of invoke() which runs the task asynchronously. Then you can join() on the ForkJoinTask to wait for the result, during which the Timer would be running:

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.execute(calculator);

    //make it a daemon thread
Timer timer = new Timer(true);

timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            System.out.println(pool.toString());
        }
    }, 100, 2000);

calculator.join(); // wait for computation
Prendergast answered 1/10, 2018 at 13:28 Comment(3)
Hmmm. So invoke(); == execute(); join(); ? Am I understanding this correctly?Sontag
@RaduMurzea exactly. externalSubmit(task);return task.join(); last two lines from invokeAutotransformer
you could also use pool.submit(task) which returns a Future<T> subtype ForkJoinTask<T> and you can call get() to get the result or block if the task submitted hasn't completed yet.Schechinger

© 2022 - 2024 — McMap. All rights reserved.