Java CompletableFuture for sequential code
Asked Answered
M

3

9

My new team is writing a Java gRPC service and to ensure that we never block the request thread we ended-up wrapping more or less ALL methods inside a CompletableFuture even if those endpoints are conceptually a sequential list of operation (no parallelism).

So the code look something like (a Java example is available at the end if needed) :

  methodA()
    methodB()
      methodD() (let say this one is a 15ms RPC call)
      methodE()
    methodC()
      methodF() (let say this one is a 5ms CPU intensive work)
      methodG()
 

Context:

  • In practice the application is much bigger and there're many more layers of functions
  • Each application host need to handle 1000 QPS, so you can imagine that methodA is called at that rate
  • Some function (few) make a RPC call that can take 5-30ms (IO)
  • Some function (very few) run CPU intensive work (< 5ms)

Edit 1: After more reading online yesterday, I understand that if, and only if, we are using true non-blocking HTTP and DB Client (and it doesn't seem like JDBC is non-blocking), this pattern can reduce the total number of threads required. My understanding is that if we have enough memory to keep one thread per request, using a synchronous code would still probably be the most efficient implementation (reduce the overhead of switching threads and loading data), but if we didn't have enough memory to keep that many threads alive, then this notion of making the whole code non-blocking can reduce the number of thread and thus allow the application to scale to more request.

Question 1: I understand this unblocks the "request thread", but in practice what's the advantage? Are we truly saving CPU time? In the example below, it feels like "some" thread will be alive the whole time anyways (in the example below, mostly the thread from CompletableFuture.supplyAsync in methodD), it just happens that it’s not the same thread as the one who received the initial request.

Question 2: Is that pattern truly a "best practice" and all services should follow a similar pattern? Outside of making the code a bit harder to read I feel, per request 50+ methods gets invoked and 50+ times we call a mix of CompletableFuture .thenCompose() or .supplyAsync. It seems like it's would be adding some overhead. Was CompletableFuture designed to be used that way across the whole code base in every method?

Annex (java example):

  public void myEndpoint(MyRequest request, StreamObserver<MyResponse> responseObserver) {
    methodA(10)
        .thenApply((response) -> responseObserver.next(response));
    
  }

  public CompletableFuture<Integer> methodA(Integer input) {
    return CompletableFuture.completedFuture(input)
        .thenCompose(this::methodB)
        .thenCompose(this::methodC)
        .thenApply((i) -> {
          System.out.println("MethodA executed by ".concat(Thread.currentThread().getName() + ": " + i));
          return i;
        });
  }

  public CompletableFuture<Integer> methodB(Integer input) {
    return CompletableFuture.completedFuture(input)
        .thenCompose(this::methodD)
        .thenCompose(this::methodE)
        .thenApply((i) -> {
          System.out.println("MethodB executed by ".concat(Thread.currentThread().getName() + ": " + i));
          return i;
        });
  }

  public CompletableFuture<Integer> methodC(Integer input) {
    return CompletableFuture.completedFuture(input)
        .thenCompose(this::methodF)
        .thenCompose(this::methodG)
        .thenApply((i) -> {
          System.out.println("MethodC executed by ".concat(Thread.currentThread().getName() + ": " + i));
          return i;
        });
  }

  public CompletableFuture<Integer> methodD(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      try {
        // Assume it's a RPC call that takes 5-30ms
        Thread.sleep(20);
        System.out.println("MethodD executed by ".concat(Thread.currentThread().getName() + ": " + input));
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      return input + 1;
    });
  }

  public CompletableFuture<Integer> methodE(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("MethodE executed by ".concat(Thread.currentThread().getName() + ": " + input));
      return input + 1;
    });
  }

  public CompletableFuture<Integer> methodF(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      try {
        // Let's assume it's a CPU intensive work that takes 2-5ms
        Thread.sleep(5);
        System.out.println("MethodF executed by ".concat(Thread.currentThread().getName() + ": " + input));
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      return input + 1;
    });
  }

  public CompletableFuture<Integer> methodG(Integer input) {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println("MethodG executed by ".concat(Thread.currentThread().getName() + ": " + input));
      return input + 1;
    });
  }
Massicot answered 5/12, 2022 at 5:38 Comment(8)
For question 1, you will reduce the number of threads if you rely on asynchronous I/O, if your RPC client allows for it. For question 3 you should definitely use different thread-pools for CPU-bound and I/O-bound tasks. And if you have tasks that mix the two, you should use a thread-pool that is appropriately sized for it, depending on the percentage of time spent on each. If you are worried about the complexity of async code, maybe have a look at ea-async.Cabalistic
Thanks a lot for the response! QQ, for question 1 you are saying it leads to less thread. Looking at my Java example above, I feel like I am just moving the problem from the "request thread" to whatever thread CompletableFuture ends-up spawning in methodD, is that not correct? (i.e. MethodD thread need to remain alive waiting for the IO output). How can the RPC Client wait for a response without having any thread alive to listen to the response? Even if it was possible, do you feel like saving that thread is worth all the overhead of CompletableFuture and spawning multiple extra async thread?Massicot
As with most performance issues, the best thing you can do is measure your throughput and compare between the different options. Your thoughts all sound reasonable, but performance is often counterintuitive, and definitely depends on many factors (eg your "some functions are CPU-expensive").Slurry
I’m saying it reduces thge number of threads if you rely on asynchronous I/O. It’s the whole point of this approach. gRPC appears to support it. In that case, no thread is waiting for the response. The JVM will invoke a callback (on another thread) when data is available. Reducing the number of threads usually improves performance by reducing the amount of context switching between those threads (as well as the amount of memory used).Cabalistic
I don't understand. Are you asking to run methodB and methodC in parallel? And kicking off methodB would then kick off methodD and methodE in parallel? Or are you asking something different? Please explain in detail the order of method execution, and specify which methods are running in parallel, and which are running sequentially.Arlettaarlette
In this example nothing is running in parallel, all methods are running sequentially as described in the Java code. My understanding is that CompletableFuture is used, in our code base, mostly to ensure that we don’t block the main thread on IO, but I’m trying to understand what’s the advantage of not blocking the request thread. At the end of the day some thread (MethodD’s CompletableFuture) is waiting for the IO, it just happens that it’s not the initial request thread. Do you think CompletableFuture should only be used to achieve parallelism?Massicot
@HenriLapierre Ahhhh, I understand your question much better now. I would copy and paste your comment into your question - future readers will understand your dilemma much better. I am writing up an answer for you now.Arlettaarlette
There’s a contradiction in your statement “all methods are running sequentially” and the fact that you are using supplyAsync. The operation within the supplier is not running sequentially.Veronikaveronike
V
4

The premise is that threads are a scarce resource, which is not intrinsic to threads but a consequence of using a pool of threads with a configured maximum. The reason today’s frameworks use a pool is that threads, as implemented today, are expensive and creating too many of them can cause significant performance problems.

You wrote

My understanding is that if we have enough memory to keep one thread per request, using a synchronous code would still probably be the most efficient implementation…

which is going into the right direction, but it’s important to keep in mind that there might be more constraints than memory. Some operating system’s schedulers become significantly less efficient with a large number of threads, some may even have a fixed limit on how many threads a process is allowed to create.

So, when you block a thread by waiting for another, you are limiting the parallel processing capabilities of the thread pool. This applies if you are using, as you put it, a “true non-blocking” API, or just any already existing API that returns futures. Submitting your own operations via supplyAsync has no point as the supplier’s code still is executed by a thread, as you correctly pointed out.

But when you have an existing future returned by an operation, you should rather chain dependent processing steps instead of waiting for its result via join and friends. Note that calling join() on existing futures can make things even worse than just blocking threads:

When you call join() on a CompletableFuture, it tries to compensate the problem. When the caller is a worker thread of a Fork/Join pool, one of two things can happen:

  • Instead of doing nothing, it may try to fetch pending jobs and execute them in-place, similar to awaitQuiescence.
    • In the best case, it will directly pick up the job you just scheduled with supplyAsync (if using the same pool) and execute it, almost as if you executed it without CompletableFuture (just consuming far more stack space).
    • In the worst case, the thread will be busy executing a long running, entirely unrelated job while the job it’s actually waiting for has been completed long ago. Imagine what happens if that unrelated job also calls join.
  • It may end up actually blocking the thread but using ForkJoinPool.managedBlock(…), which may start a new worker thread to ensure that the pool’s configured parallelism is kept. Great to solve the problem of reduced parallelism, but on the other hand, reintroducing the very problem of resource consumption you actually wanted to solve with thread pools.

The worst of all is that you can’t even predict which of the two things will happen.


There are, however, cases where not blocking a request thread by utilizing other threads has a point. Most notably when the response time for the request itself matters and the results of the background computation are delivered independent of the initial response. The most prominent example of this pattern is the event dispatch thread of GUI frameworks which must be kept free of long running operations, to be able to process subsequent user input.


Note that there is a general solution on the way, to make 99% of all future chains obsolete. Virtual Threads, which are in preview state in JDK 19, are cheap to create and allow to create one thread per request, just like you envisioned in the cite above. When a virtual thread gets blocked, it will release the underlying platform thread for the next virtual thread, so there is no reason to hesitate to call join() on any future, even those belonging to “true non-blocking” APIs.

The best way to interoperate with this concept and the status quo is to design methods to not return futures, but perform the operation in-place. It’s still possible to design a future chain when necessary, i.e. by using .thenApplyAsync(this::inPlaceEvalMethod) instead of .thenCompose(this::futureReturningMethod). But at the same time, you can write a plain sequential version just calling these methods, which can be executed by a virtual thread. In fact, you could even add the plain sequential version today and benchmark both approaches. The results might convince your team members that “not blocking the request thread” is not necessarily an improvement.

Veronikaveronike answered 14/12, 2022 at 10:30 Comment(2)
Great answer, thanks a lot! Out of curiosity, in case you know, how does "true non-blocking" operation work/are implemented. How is it possible to wait for external IO without having any thread alive? I suspect it relies on some sort of callback with the OS, but I don't get the details. #74734569Massicot
In either case, it requires support from the operating system/drivers. It may use a callback or send a signal. It may also feature a mechanism to poll the status of pending I/O, which usually only makes sense if it allows to poll the status of multiple jobs at once.Veronikaveronike
T
2

On the first question: there is no need to wrap all intermediate calls into CompleteableFutures if these are all sequential. You can as well wrap the chain of sequential calls into one single CompleteableFuture:

public void myEndpoint() throws ExecutionException, InterruptedException {
    CompletableFuture.supplyAsync(() -> methodA(10))
        .thenApply((response) -> responseObserver.next(response));
}

public int methodA(Integer input) {
    var i = methodC((methodB(input)));
    System.out.println("MethodA executed by ".concat(Thread.currentThread().getName() + ": " + i));
    return i;
}

public int methodB(Integer input) {
    var i = methodE(methodD(input));
    System.out.println("MethodB executed by ".concat(Thread.currentThread().getName() + ": " + i));
    return i;
}

public int methodC(Integer input) {
    var i = methodG(methodF(input));
    System.out.println("MethodC executed by ".concat(Thread.currentThread().getName() + ": " + i));
    return i;
}

public Integer methodD(Integer input) {
    try {
        // Assume it's a RPC call that takes 5-30ms
        Thread.sleep(20);
        System.out.println("MethodD executed by ".concat(Thread.currentThread().getName() + ": " + input));
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return input + 1;
}

public int methodE(Integer input) {
    System.out.println("MethodE executed by ".concat(Thread.currentThread().getName() + ": " + input));
    return input + 1;
}

public int methodF(Integer input) {
    try {
        // Let's assume it's a CPU intensive work that takes 2-5ms
        Thread.sleep(5);
        System.out.println("MethodF executed by ".concat(Thread.currentThread().getName() + ": " + input));
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return input + 1;
}

public int methodG(Integer input) {
    System.out.println("MethodG executed by ".concat(Thread.currentThread().getName() + ": " + input));
    return input + 1;
}

The result is the same and the main thread gets not blocked. Since there are far less CompleteableFuture instances, there is less overhead from handing calls over from one thread to another.

Thus for question 2, no this is not best practices the way your example code is structured. Use CompleteableFuture if you must, avoid it otherwise. For example you need to use CompleteableFuture#thenCompose when you don't have control over the API you are calling (i.e. you can't change the return type from CompleteableFuture<T> to plain T). Another case is when you want to take advantage of parallelism. But this is not applicable here.

Tanto answered 7/12, 2022 at 20:12 Comment(6)
Thanks! Follow-up question 1: In your new code, I understand it unblocks the "request thread" with less effort, but I'm still curious, what did we gain versus not having any CompletableFuture at all? At the end of the day we have the same number of thread running I guess? Follow-up question 2: For methodD, even if it's a long IO call, you don't see a reason to use another CompletableFuture (i.e. make it async to the rest)? You would move it to a separate thread only if there was a need to do something in parallel during that call?Massicot
I'm asking the above because the philosophy I was told, is to never call .join() or .get(). So basically as soon as one of the leaf method create a CompletableFuture (let say we were to do it in methodD), then this forces every upstream method to also now return a CompletableFuture (the only way to avoid doing that would be to call .get() or .join()).Massicot
@HenriLapierre, Re. 1: the gain is not blocking the request thread. Whether this is an advantage depends on your application. Say you have multiple endpoints myEndpoint1, myEndpoint2, ... you can call them all in parallel getting the results back via the StreamObserver callback. Without using a CompletableFuture you would end up calling one endpoint after each other, waiting for each one to come back with its result.Tanto
@HenriLapierre, Re. 2: I don't see any advantage by using another CompletableFuture. methodD cannot initiat the RPC before its input argument becomes available. Wrapping that method into another CompletableFuture would only add another asynchronous call and one instead of two threads blocked on the RPC.Tanto
> never call .join() or .get(): which means eventually consuming the results of the asynchronous computation via a callback. This is exactly what happens with the call to responseObserver.next.Tanto
In principle, the call represented by Thread.sleep(20) in methodD() should actually be an asynchronous call returning a CompletableFuture for this approach to be implemented correctly. This means that methodD() should return a CompletableFuture, as well as all parents in the call hierarchy. This would allow to actually free up the thread during methodD()’s RPC call. This means that all (potentially) non-trivial methods should return CompletableFuture to avoid big refactorings later.Cabalistic
A
2

Your question is a little unclear, and I think your comment phrases the real question better. Copied and pasted below.

In this example nothing is running in parallel, all methods are running sequentially as described in the Java code. My understanding is that CompletableFuture is used, in our code base, mostly to ensure that we don’t block the main thread on IO, but I’m trying to understand what’s the advantage of not blocking the request thread. At the end of the day some thread (MethodD’s CompletableFuture) is waiting for the IO, it just happens that it’s not the initial request thread. Do you think CompletableFuture should only be used to achieve parallelism?

Great question.

When you write plain, sequential code with no CompletableFuture<T>, your code runs synchronously on a single thread. No new threads are made to run your code. However, when you make a CompletableFuture<T> and put a task on it, a couple of things occur.

  • A new thread is created
  • The task given to the CompletableFuture<T> is placed onto this new thread
  • Java then uses a scheduler to jump back and forth between the main thread and the new thread when doing work.
    • Now, if your computer has multiple cores, and the number of cores is larger than the number of threads, then the above may not happen. But typically, the number of threads your application uses is way more than 2/4/8, so the point I am making above is almost always true

As you can see, the 3rd bullet is the most important because this is where the biggest benefit of multithreading occurs. The Java scheduler allows you to pause and continue threads on the fly, so that every thread can make some progress over time.

This is powerful because some threads may be waiting on IO work to be completed. A thread that is waiting on IO work is essentially doing nothing and wasting its turn on the CPU core.

By using a Java scheduler, you can minimize (if not eliminate) the time wasted on a core, and quickly switch to a thread that is not waiting on IO work to continue.

And this is probably the big benefit that your teammates are striving for. They want to ensure that all work that is being done wastes as little time as possible on the core. That is the big point.

That said, whether or not they actually succeeded depends on a couple of different questions that I need you to answer.

  1. You mentioned methodB and methodC. Can any of these methods be run in parallel? Does methodB have to fully complete before methodC can be executed? Or can methodC run in parallel to methodB? The same question applies to methodD and methodE, or methodF and methodG. I understand that currently, they run sequentially and wait for each other to finish. That's not my question. I am asking if it is possible for them to run in parallel.

  2. Are you using rate limiting tools like Semaphore anywhere in your code? Ideally, I would limit the scope of your answer to explicit code that your team writes, but if you know for sure that one of your dependencies does it, then feel free to mention it too.

  • If your answers to question 1 is no, then 99% of the time, doing what your team is doing is a terrible idea. The only method that should be in its own separate thread is methodA, but it sounds like you are already doing that.
  • If you answer to question 1 is at least partly yes but question 2 is no, then your teammates are pretty much correct. Over time, try to get an idea about where and when it makes the most sense. But as a first pass solution? This isn't a horrible idea.
    • If you said that B and C can be parallel, but D and E cannot, then wrapping B and C in CompletableFuture<T> makes sense, but not for D and E. They should just be basic sequential Java code. Unless of course, this a modular method/class that can be used in other code and might be parallel there. Nuance is required here, but starting with all of them being CompletableFuture<T> isn't a terrible first solution.
  • If your answer to question 1 is at least partly yes and your answer to question 2 is also yes, then you'll have to take a deep dive into your profiler to find the answer. Things like Semaphore are a different type of IO since they are a "context-dependent" tax that you pay depending on the state of your program around you. But since they are a construct that exists inside of your code, it becomes a dependable and measurable sort of IO that you can build deductions and assumptions off of. To keep my answer short, rate-limiting tools will allow you to make dependable assumptions about your code, and thus, any results from your profiler will be way more useful than they would be else where. methodA should definitely still be in its own separate thread.

So in short.

  • If 1 and 2 are yes, the answer is going to require nuance. Go into your profiler and find out.
  • If 1 is yes but 2 is no, your teammates are right. Change as needed, but go ahead with this solution.
  • If 1 is no, then your teammates are wrong. Make them change.

And for all of these, methodA should be in its own separate thread no matter what.

EDIT - the original poster of the question has confirmed that the answer to question 1 and 2 are both no. Therefore, the team is wrong and should pull back this change. I will take this opportunity to explain in better detail why their decision is wrong.

As mentioned before, the big utility behind CompletableFuture<T> and other threading tools is that they allow you to do work on specific threads while other specific threads are waiting on some IO operation to finish. This is accomplished by switching between threads.

However, if there is no IO operation being done, then you are not saving time because none of the threads were ever waiting. So you gain nothing by having CompletableFuture<T>. And worse yet, you actually lose performance by doing this.

See, when switching between threads like I just mentioned, you have to "page state", which is the short way of saying "grab all the variables in scope for that thread and load them into main memory, while unloading all the data from the previous thread". "Paging state" like this is fast, but it's not instantaneous. It costs you performance to switch threads like that.

And to make matters worse, your teammates put this on every method. So not only are they slowing down their code by a nontrivial amount just to pointlessly spin their wheels, but they are doing it extremely frequently, since this deoptimization is occurring on every method.

I would confront your team immediately and point out how damaging this is. They are explicitly in the wrong for doing this, and even if they were preparing for some inevitable future, this is still a terrible time to implement this. Wait until that time comes and build it out as the need arises. As is now, they are gutting their performance for no good reason.

Arlettaarlette answered 8/12, 2022 at 20:55 Comment(7)
Regarding Question 1, no in general our functions can’t run in parallel, the output of one is needed as input of the other one. We do have a very specific function that is a bit more CPU intensive and where CompletableFuture is indeed used to parallelize the work, and that usage make sense to me. My question was about the rest of the code base where things red to be executed sequentially but yet are still executed asynch.Massicot
Question 2: No we don’t use Semaphore and we never call .get() on the future neither since the goal is to be non-blockingMassicot
Regarding methodA() being on a separate thread, that make sense. In practice I believe Spring Boot/gRPC server already call use from a different thread for each request so I don’t think we need to do that explicitly ourself, but if it’s not the case, then yeah, it obviously make sense to spawn a different thread for each request so that we can keep receiving and handling other requests while the first one executes.Massicot
To partially correct myself from the text you quoted from me, it does seem like there some IO library that are non blocking (like Apache NIO HttpAsyncClient) and in that case it does seem like it is possible to have no thread alive while waiting for IO (I posted a different question to understand how it works). While that pattern allows to reduce the number of thread and memory footprint, it’s still not clear to me, when memory isn’t a concern, if it’s truly more CPU efficient than having 1 thread per request. It doesn’t seem like a non-blocking MySQL client (DBCP) exist however.Massicot
I posted two separate related questions : #74734569 and #74735216Massicot
Ok, then there is your answer. Your teammates are wrong, and the decision to have everything as a CompletableFuture<T> is a premature optimization at best, and a performance throttle/maintenance nightmare at worst. I am checking out your other 2 questions now.Arlettaarlette
Also, I have edited this answer to expand on why exactly your teammates are wrong. I would encourage you to confront them with this information.Arlettaarlette

© 2022 - 2024 — McMap. All rights reserved.