My new team is writing a Java gRPC service and to ensure that we never block the request thread we ended-up wrapping more or less ALL methods inside a CompletableFuture even if those endpoints are conceptually a sequential list of operation (no parallelism).
So the code look something like (a Java example is available at the end if needed) :
methodA()
methodB()
methodD() (let say this one is a 15ms RPC call)
methodE()
methodC()
methodF() (let say this one is a 5ms CPU intensive work)
methodG()
Context:
- In practice the application is much bigger and there're many more layers of functions
- Each application host need to handle 1000 QPS, so you can imagine that methodA is called at that rate
- Some function (few) make a RPC call that can take 5-30ms (IO)
- Some function (very few) run CPU intensive work (< 5ms)
Edit 1: After more reading online yesterday, I understand that if, and only if, we are using true non-blocking HTTP and DB Client (and it doesn't seem like JDBC is non-blocking), this pattern can reduce the total number of threads required. My understanding is that if we have enough memory to keep one thread per request, using a synchronous code would still probably be the most efficient implementation (reduce the overhead of switching threads and loading data), but if we didn't have enough memory to keep that many threads alive, then this notion of making the whole code non-blocking can reduce the number of thread and thus allow the application to scale to more request.
Question 1: I understand this unblocks the "request thread", but in practice what's the advantage? Are we truly saving CPU time? In the example below, it feels like "some" thread will be alive the whole time anyways (in the example below, mostly the thread from CompletableFuture.supplyAsync in methodD), it just happens that it’s not the same thread as the one who received the initial request.
Question 2:
Is that pattern truly a "best practice" and all services should follow a similar pattern? Outside of making the code a bit harder to read I feel, per request 50+ methods gets invoked and 50+ times we call a mix of CompletableFuture .thenCompose()
or .supplyAsync
. It seems like it's would be adding some overhead. Was CompletableFuture
designed to be used that way across the whole code base in every method?
Annex (java example):
public void myEndpoint(MyRequest request, StreamObserver<MyResponse> responseObserver) {
methodA(10)
.thenApply((response) -> responseObserver.next(response));
}
public CompletableFuture<Integer> methodA(Integer input) {
return CompletableFuture.completedFuture(input)
.thenCompose(this::methodB)
.thenCompose(this::methodC)
.thenApply((i) -> {
System.out.println("MethodA executed by ".concat(Thread.currentThread().getName() + ": " + i));
return i;
});
}
public CompletableFuture<Integer> methodB(Integer input) {
return CompletableFuture.completedFuture(input)
.thenCompose(this::methodD)
.thenCompose(this::methodE)
.thenApply((i) -> {
System.out.println("MethodB executed by ".concat(Thread.currentThread().getName() + ": " + i));
return i;
});
}
public CompletableFuture<Integer> methodC(Integer input) {
return CompletableFuture.completedFuture(input)
.thenCompose(this::methodF)
.thenCompose(this::methodG)
.thenApply((i) -> {
System.out.println("MethodC executed by ".concat(Thread.currentThread().getName() + ": " + i));
return i;
});
}
public CompletableFuture<Integer> methodD(Integer input) {
return CompletableFuture.supplyAsync(() -> {
try {
// Assume it's a RPC call that takes 5-30ms
Thread.sleep(20);
System.out.println("MethodD executed by ".concat(Thread.currentThread().getName() + ": " + input));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return input + 1;
});
}
public CompletableFuture<Integer> methodE(Integer input) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("MethodE executed by ".concat(Thread.currentThread().getName() + ": " + input));
return input + 1;
});
}
public CompletableFuture<Integer> methodF(Integer input) {
return CompletableFuture.supplyAsync(() -> {
try {
// Let's assume it's a CPU intensive work that takes 2-5ms
Thread.sleep(5);
System.out.println("MethodF executed by ".concat(Thread.currentThread().getName() + ": " + input));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return input + 1;
});
}
public CompletableFuture<Integer> methodG(Integer input) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("MethodG executed by ".concat(Thread.currentThread().getName() + ": " + input));
return input + 1;
});
}
methodB
andmethodC
in parallel? And kicking offmethodB
would then kick offmethodD
andmethodE
in parallel? Or are you asking something different? Please explain in detail the order of method execution, and specify which methods are running in parallel, and which are running sequentially. – ArlettaarlettesupplyAsync
. The operation within the supplier is not running sequentially. – Veronikaveronike