In which thread do CompletableFuture's completion handlers execute?
Asked Answered
A

5

52

I have a question about CompletableFuture method:

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)

The thing is the JavaDoc says just this:

Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion.

What about threading? In which thread is this going to be executed? What if the future is completed by a thread pool?

Angulate answered 5/9, 2017 at 17:30 Comment(0)
I
33

The policies as specified in the CompletableFuture docs could help you understand better:

  • Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

  • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.

Update: I would also advice on reading this answer by @Mike as an interesting analysis further into the details of the documentation.

Interstadial answered 5/9, 2017 at 17:41 Comment(1)
For methods like thenApply, thenRun what's expained in the doc is clear engough. But what about allOf, for something like fa = CompletableFuture.allOf(f0, f1, f2); fa.thenRun(someRunnable), assume f0, f1, f2 are completed in thread A, thread B, thread C respectively, then which thread will someRunnable be executed in? Again, what about thenCompose(Function<? super T,? extends CompletionStage<U>> fn) in case like f0.thenCompose(x -> someNewCompletionStageProducer).thenRun(someRunnable), someRunnable will be executed in thread of f0 or the future returned by fn? @InterstadialGiovannigip
D
63

As @nullpointer points out, the documentation tells you what you need to know. However, the relevant text is surprisingly vague, and some of the comments (and answers) posted here seem to rely on assumptions that aren't supported by the documentation. Thus, I think it's worthwhile to pick it apart. Specifically, we should read this paragraph very carefully:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

Sounds straightforward enough, but it's light on details. It seemingly deliberately avoids describing when a dependent completion may be invoked on the completing thread versus during a call to a completion method like thenApply. As written, the paragraph above is practically begging us to fill in the gaps with assumptions. That's dangerous, especially when the topic concerns concurrent and asynchronous programming, where many of the expectations we've developed as programmers get turned on their head. Let's take a careful look at what the documentation doesn't say.

The documentation does not claim that dependent completions registered before a call to complete() will run on the completing thread. Moreover, while it states that a dependent completion might be invoked when calling a completion method like thenApply, it does not state that a completion will be invoked on the thread that registers it (note the words "any other").

These are potentially important points for anyone using CompletableFuture to schedule and compose tasks. Consider this sequence of events:

  1. Thread A registers a dependent completion via f.thenApply(c1).
  2. Some time later, Thread B calls f.complete().
  3. Around the same time, Thread C registers another dependent completion via f.thenApply(c2).

Conceptually, complete() does two things: it publishes the result of the future, and then it attempts to invoke dependent completions. Now, what happens if Thread C runs after the result value is posted, but before Thread B gets around to invoking c1? Depending on the implementation, Thread C may see that f has completed, and it may then invoke c1 and c2. Alternatively, Thread C may invoke c2 while leaving Thread B to invoke c1. The documentation does not rule out either possibility. With that in mind, here are assumptions that are not supported by the documentation:

  1. That a dependent completion c registered on f prior to completion will be invoked during the call to f.complete();
  2. That c will have run to completion by the time f.complete() returns;
  3. That dependent completions will be invoked in any particular order (e.g., order of registration);
  4. That dependent completions registered before f completes will be invoked before completions registered after f completes.

Consider another example:

  1. Thread A calls f.complete();
  2. Some time later, Thread B registers a completion via f.thenApply(c1);
  3. Around the same time, Thread C registers a separate completion via f.thenApply(c2).

If it is known that f has already run to completion, one might be tempted to assume that c1 will be invoked during f.thenApply(c1) and that c2 will be invoked during f.thenApply(c2). One might further assume that c1 will have run to completion by the time f.thenApply(c1) returns. However, the documentation does not support these assumptions. It may be possible that one of the threads calling thenApply ends up invoking both c1 and c2, while the other thread invokes neither.

A careful analysis of the JDK code could determine how the hypothetical scenarios above might play out. But even that is risky, because you may end up relying on an implementation detail that is (1) not portable, or (2) subject to change. Your best bet is not to assume anything that's not spelled out in the javadocs or the original JSR spec.

tldr: Be careful what you assume, and when you write documentation, be as clear and deliberate as possible. While brevity is a wonderful thing, be wary of the human tendency to fill in the gaps.

Domingo answered 5/9, 2017 at 20:31 Comment(11)
Interesting analysis - really digs into the intricacies of implementation promises in the realm of concurrent programming.Afoot
@BoristheSpider Thanks! I found the documentation to be carefully worded yet curiously vague, which sent me into "paranoid analysis" mode. I was also amused that the accepted answer made no attempt to decipher or otherwise expand upon the quoted Javadocs, which was probably wise :).Domingo
It seems that in the past, I should have asked myself what “completion method” actually means when I read that documentation. “A careful analysis of the JDK code” leads to the conclusion that most of the surprising scenarios you describe, are indeed possible. So the risk to rely on implementation details is rather low. The fact that two independent actions have no ordering, hence, are not executed in the order they were registered, has been discussed here already, though that didn’t even require the more surprising scenarios you describe.Chinua
@Chinua I rather dislike the way they use 'completion' to describe a task that runs upon completion of its antecedent. Because that word appears rather often when discussing futures ("completion of", "having run to completion", etc.), it's easy to gloss over or misinterpret it in contexts like that javadoc excerpt. I would have preferred they use 'continuation' instead.Domingo
Yeah, when I read it the first time, I thought that “a completion method” meant either of complete, completeExceptionally, cancel, or obtrude… as these complete rather than chain or define or continuation…Chinua
@Chinua I think it generally refers to those, and concurrent calls to complete and cancel, for example, imply a similar race to the one I described between complete and thenApply. It's possible that "completion method" in the quoted excerpt was not meant to directly include chaining methods like thenApply. However, those chaining methods lead to tryFire being called, and the code comments explicitly identify tryFire as a "completion method", so they would still be transitively covered. The more I look at this, the more I think the documentation is just terrible.Domingo
I don't fully understand your answer. I appreciate you highlighting what the specification does not guarantee, but can you please clarify the other side of the coin as well? Specifically, can you provide examples of which threads are allowed to execute non-async completions? I am having a hard time imagining what these can be. Thank you.Skit
That is the point of the answer. The doc outlined a few choices the implementation can freely choose from wherever it sees fit: any thread that registers the thenApply, or thread that completes the completionStage. You can't assume what the implementation does.Stahl
If you really need to specify which thread to run the thenApply, then use thenApplyAsync. It is the same thing except you can control which executor to use.Stahl
or by any other caller of a completion method does this even include callers of such methods on other, unrelated CompletableFuture objects?Belize
@Belize no, it does not apply to entirely unrelated futures.Chinua
I
33

The policies as specified in the CompletableFuture docs could help you understand better:

  • Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

  • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.

Update: I would also advice on reading this answer by @Mike as an interesting analysis further into the details of the documentation.

Interstadial answered 5/9, 2017 at 17:41 Comment(1)
For methods like thenApply, thenRun what's expained in the doc is clear engough. But what about allOf, for something like fa = CompletableFuture.allOf(f0, f1, f2); fa.thenRun(someRunnable), assume f0, f1, f2 are completed in thread A, thread B, thread C respectively, then which thread will someRunnable be executed in? Again, what about thenCompose(Function<? super T,? extends CompletionStage<U>> fn) in case like f0.thenCompose(x -> someNewCompletionStageProducer).thenRun(someRunnable), someRunnable will be executed in thread of f0 or the future returned by fn? @InterstadialGiovannigip
R
10

From the Javadoc:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

More concretely:

  • fn will run during the call to complete() in the context of whichever thread has called complete().

  • If complete() has already finished by the time thenApply() is called, fn will be run in the context of the thread calling thenApply().

Reenareenforce answered 5/9, 2017 at 17:36 Comment(0)
J
6

When it comes to threading the API documentation is lacking. It takes a bit of inference to understand how threading and futures work. Start with one assumption: the non-Async methods of CompletableFuture do not spawn new threads on their own. Work will proceed under existing threads.

thenApply will run in the original CompletableFuture's thread. That's either the thread that calls complete(), or the one that calls thenApply() if the future is already completed. If you want control over the thread—a good idea if fn is a slow operation—then you should use thenApplyAsync.

Judie answered 5/9, 2017 at 17:33 Comment(2)
Not quite clear original thread. What if the the future is completed by a standalone thread pool? For instance, we execute some computation in the pool and when it's finished just call CompletableFuture::complete.Angulate
Also note the corner case where the CompletableFuture completes before the thenApply call returns - in this case, because the CompletableFuture is completed; it will execute on the current thread.Afoot
G
1

I know this question is old, but I want to use source code to explain this question.

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    Object r;
    if ((r = result) != null)
        return uniAcceptNow(r, e, f);
    CompletableFuture<Void> d = newIncompleteFuture();
    unipush(new UniAccept<T>(e, d, this, f));
    return d;
}

This is the source code from java 16, and we can see, if we trigger thenAccept, we will pass a null executor service reference into our function. From the 2nd function uniAcceptStage() 2nd if condition. If result is not null, it will trigger uniAcceptNow()

if (e != null) {
     e.execute(new UniAccept<T>(null, d, this, f));
} else {
     @SuppressWarnings("unchecked") T t = (T) r;
     f.accept(t);
     d.result = NIL;
}

if executor service is null, we will use lambda function f.accept(t) to execute it. If we are triggering this thenApply/thenAccept from main thread, it will use main thread as executing thread.

But if we cannot get previous result from last completablefuture, we will push our current UniAccept/Apply into stack by using uniPush function. And UniAccept class has tryFire() which will be triggered from our postComplete() function

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (STACK.compareAndSet(f, h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                NEXT.compareAndSet(h, t, null); // try to detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}
Gogh answered 16/1, 2022 at 22:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.