Thread used for Java CompletableFuture composition?
Asked Answered
H

2

7

I'm starting to be comfortable with Java CompletableFuture composition, having worked with JavaScript promises. Basically the composition just scheduled the chained commands on the indicated executor. But I'm unsure of which thread is running when the composition is performed.

Let's say I have two executors, executor1 and executor2; for simplicity let's say they are separate thread pools. I schedule a CompletableFuture (to use a very loose description):

CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(this::getFoo, executor1);

Then when that is done I transform the Foo to Bar using the second executor:

CompletableFuture<Bar> futureBar .thenApplyAsync(this::fooToBar, executor2);

I understand that getFoo() will be called from a thread in the executor1 thread pool. I understand that fooToBar() will be called from a thread in the executor2 thread pool.

But what thread is used for the actual composition, i.e. after getFoo() finishes and futureFoo() is complete; but before the fooToBar() command gets scheduled on executor2? In other words, what thread actually runs the code to schedule the second command on the second executor?

Is the scheduling performed as part of the same thread in executor1 that called getFoo()? If so, would this completable future composition be equivalent to my simply scheduling fooToBar() manually myself in the first command in the executor1 task?

Hagioscope answered 5/3, 2021 at 16:0 Comment(8)
it's the thread that does the calling, it depends on your code base. But no, it is not neither the one from executor1 nor executor2. The entire point of thenApplyAsync is to have determinism in those actions, only. may be more details hereRufinaruford
"it's the thread that does the calling …" What do you mean by that exactly? You mean that if in my main() method in a thread named main-thread I call the original CompletableFuture.supplyAsync(), then when that is finished the call to schedule the fooToBar() operation in executor2 will be in main-thread as well? But how can that be, as main-thread has asynchronously gone on its merry way and is now factoring primes or whatever?Hagioscope
I might have miss-understood your question a bit, sorry. Is your question what thread will schedule the execution on the executor2 as part of that thenApplyAsync?Rufinaruford
Yes, @Eugene, that is exactly my question. "In other words, what thread actually runs the code to schedule the second command on the second executor [as part of thenApplyAsync()]?"Hagioscope
I'm sure that it's simply the thread that has finished the first future, and I don't see why it should/could be any other. Feel free to read the sources of CompletableFuture and CompletionStage. I just did and it very much seems like the first future .. the code is just terrible to read and I can't pinpoint it, so I don't want to write an answer.Like
"…the code is just terrible to read and I can't pinpoint it…" haha That's exactly why I asked this question. ;)Hagioscope
@Like and we were both wrongRufinaruford
@Rufinaruford ¯\_(ツ)_/¯Like
T
5

This is intentionally unspecified. In practice, it will be handled by the same code that also handles the chained operations when the variants without the Async suffix are invoked and exhibits similar behavior.

So when we use the following test code

CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    return "";
}, r -> new Thread(r, "A").start())
.thenAcceptAsync(s -> {}, r -> {
    System.out.println("scheduled by " + Thread.currentThread());
    new Thread(r, "B").start();
});

it will likely print

scheduled by Thread[A,5,main]

as the thread that completed the previous stage was used to schedule the depending action.

However when we use

CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> "",
    r -> new Thread(r, "A").start());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
first.thenAcceptAsync(s -> {}, r -> {
    System.out.println("scheduled by " + Thread.currentThread());
    new Thread(r, "B").start();
});

it will likely print

scheduled by Thread[main,5,main]

as by the time the main thread invokes thenAcceptAsync, the first future is already completed and the main thread will schedule the action itself.

But that is not the end of the story. When we use

CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(5));
    return "";
}, r -> new Thread(r, "A").start());

Set<String> s = ConcurrentHashMap.newKeySet();
Runnable submitter = () -> {
    String n = Thread.currentThread().getName();
    do {
        for(int i = 0; i < 1000; i++)
            first.thenAcceptAsync(x -> s.add(n+" "+Thread.currentThread().getName()),
                Runnable::run);
    } while(!first.isDone());
};
Thread b = new Thread(submitter, "B");
Thread c = new Thread(submitter, "C");
b.start();
c.start();
b.join();
c.join();
System.out.println(s);

It may not only print the combinations B A and C A from the first scenario and B B and C C from the second. On my machine it reproducibly also prints the combinations B C and C B indicating that an action passed to thenAcceptAsync by one thread got submitted to the executor by the other thread calling thenAcceptAsync with a different action at the same time.

This is matching the scenarios for the thread evaluating the function passed to thenApply (without the Async) described in this answer. As said at the beginning, that was what I expected as both things are likely handled by the same code. But unlike the thread evaluating the function passed to thenApply, the thread invoking the execute method on the Executor is not even mentioned in the documentation. So in theory, another implementation could use an entirely different thread not calling a method on the future nor completing it.

Turkic answered 11/3, 2021 at 10:57 Comment(0)
S
0

At the end is a simple program that does like your code snippet and allows you to play with it.

The output confirms that the executor you supply is called to complete (unless you explicitly call complete early enough - which would happen in the calling thread of complete) when the condition it is waiting on is ready - the get() on a Future blocks until the Future is finished.

Supply an arg - there's an executor 1 and executor 2, supply no args there's just one executor. The output is either (same executor - things a run as separate tasks in the same executor sequentially) -

In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-1-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed

OR (two executors - things again run sequentially but using different executors) -

In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-2-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed

Remember: the code with the executors (in this example can start immediately in another thread .. the getFoo was called prior to even getting to setting up the FooToBar).

Code follows -

package your.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;

public class TestCompletableFuture {
    private static void dumpWhichThread(final String msg) {
        System.err.println("In thread " + Thread.currentThread().toString() + " - " + msg);
    }

    private static final class Foo {
        final int i;
        Foo(int i) {
            this.i = i;
        }
    };
    public static Supplier<Foo> getFoo() {
        dumpWhichThread("getFoo");
        return new Supplier<Foo>() {
            @Override
            public Foo get() {
                dumpWhichThread("Supplying Foo");
                return new Foo(10);
            }

        };
    }

    private static final class Bar {
        final String j;
        public Bar(final String j) {
            this.j = j;
        }
    };
    public static Function<Foo, Bar> getFooToBar() {
        dumpWhichThread("getFooToBar");
        return new Function<Foo, Bar>() {
            @Override
            public Bar apply(Foo t) {
                dumpWhichThread("fooToBar");
                return new Bar("" + t.i);
            }
        };
    }


    public static void main(final String args[]) throws InterruptedException, ExecutionException, TimeoutException {
        final TestCompletableFuture obj = new TestCompletableFuture();
        obj.running(args.length == 0);
    }

    private String running(final boolean sameExecutor) throws InterruptedException, ExecutionException, TimeoutException {
        final Executor executor1 = Executors.newSingleThreadExecutor(); 
        final Executor executor2 = sameExecutor ? executor1 : Executors.newSingleThreadExecutor(); 
        CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(getFoo(), executor1);
        CompletableFuture<Bar> futureBar = futureFoo.thenApplyAsync(getFooToBar(), executor2);
        try {
            // Try putting a complete here before the get ..
            return futureBar.get(50, TimeUnit.SECONDS).j;
        }
        finally {
            dumpWhichThread("Completed");
        }
    }
}

Which thread triggers the Bar stage to progress - in the above - it's executor1. In general the thread completing the future (i.e. giving it a value) is what releases the thing depending on it. If you completed the FutureFoo immediately on the main thread - it would be the one triggering it.

SO you have to be careful with this. If you have "N" things all waiting on the future results - but use only a single threaded executor - then the first one scheduled will block that executor until it completes. You can extrapolate to M threads, N futures - it can decay into "M" locks preventing the rest of things progressing.

Stereopticon answered 7/3, 2021 at 23:5 Comment(5)
your examples show which thread executes those features, not which schedules them.Rufinaruford
It shows both - the getFoo and getFooToBar - are called at the time the scheduling is done in the main thread (i..e in the running() method - calls to supplyAsync and thenApplyAsync - the method supplied is called at the time of scheduling) ... exactly when they will be executed is a function of the specific Executor used - but definitely that won't happen in the main thread..Stereopticon
It's not about which thread calls supplyAsync and thenApplyAsync. It's about which thread triggers the second, dependent future when the first one is completed. You're missing the point of this question.Like
There is no other "magic" thread ... executor1 releases the chained execution, as executor2 does if the bar then chained onto something else.Stereopticon
Something interesting (I hadn't used CompletableFuture before) I noticed was that if you complete, rather than letting the async complete, you do get what you expect delivered to the futureBar, however, the executor still runs the Supply function .. it wasn't quite what I expected although it is documented https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html that Completeable can't cancel the execution - so makes sense.Stereopticon

© 2022 - 2024 — McMap. All rights reserved.