How to run blocking codes on another thread and make http request return immediately
Asked Answered
A

1

2

We started a new project with Quarkus and Mutiny, and created a bunch of endpoints with Quarkus @Funq, everything has been working fine so far. Now we want to process something very time-consuming in one of the endpoints, and what we are expecting is, once user clicks a button to send the http request from frontend and hits this specific endpoint, we are going to return 202 Accepted immediately, leaving the time-consuming operation processing in another thread from backend, then send notification email accordingly to user once it completes.

I understand this can be done with @Async or CompletableFuture, but now we want to do this with Mutiny. Based on how I read Mutiny documentation here https://smallrye.io/smallrye-mutiny/guides/imperative-to-reactive, runSubscriptionOn will avoid blocking the caller thread by running the time-consuming method on another thread, and my testing showed the time-consuming codes did get executed on a different thread. However, the http request does not return immediately, it is still pending until the time-consuming method finishes executing (as I observe in the browser's developer tool). Did I misunderstand how runSubscriptionOn works? How do I implement this feature with Mutiny?

My @Funq endpoint looks like this

@Inject
MyService myService;

@Funq("api/report")
public Uni<String> sendReport(MyRequest request) {
    ExecutorService executor = Executors.newFixedThreadPool(10, r -> new Thread(r, "CUSTOM_THREAD"));

    return Uni.createFrom()
        .item(() -> myService.timeConsumingMethod(request))
        .runSubscriptionOn(executor);
} 

Edit: I found the solution using Uni based on @Ladicek's answer. After digging deeper into Quarkus and Uni I have a follow-up question:

Currently most of our blocking methods are not returning Uni on Service level, instead we create Uni object from what they return (i.e. object or list), and return the Uni on Controller level in their endpoints like this

return Uni.createFrom().item(() -> myService.myIOBlockingMethod(request)).

As @Ladicek explained, I do not have to use .runSubscriptionOn explicitly as the IO blocking method will automatically run on a worker thread (as my method on Service level does not return Uni). Is there any downside for this? My understanding is, this will lead to longer response time because it has to jump between the I/O thread and worker thread, am I correct?

What is the best practice for this? Should I always return Uni for those blocking methods on Service level so that they can run on the I/O threads as well? If so I guess I will always need to call .runSubscriptionOn to run it on a different worker thread so that the I/O thread is not blocked, correct?

Athena answered 24/9, 2021 at 22:30 Comment(0)
R
1

By returning a Uni, you're basically saying that the response is complete when the Uni completes. What you want is to run the action on a thread pool and return a complete response (Uni or not, that doesn't matter).

By the way, you're creating an extra thread pool in the method, for each request, and don't shut it down. That's wrong. You want to create one thread pool for all requests (e.g. in a @PostConstruct method) and ideally also shut it down when the application ends (in a @PreDestroy method).

Riches answered 25/9, 2021 at 15:18 Comment(7)
Thanks! The thread pool I created is only for testing/demo purpose so that I was able to verify myService.timeConsumingMethod(request) did get executed on a different thread rather than the main thread when using runSubscriptionOn as described in Mutiny documentation . What I do not understand is, why the response does not return immediately, even with the time-consuming method being executed on a different thread? I am new to Mutiny and would appreciate if you could be more specific to help me understand this.Athena
Please correct me if I'm wrong, thanks! So returning a Uni frees up the thread that was processing the request to handle other requests while waiting for the async response, but the client still need to wait for the result, am I correct? If so, what's the point of runSubscriptionOn to run blocking codes on another thread? Isn't it unblocking already with Uni itself?Athena
I guess you probably need to do some reading about reactive. You can start at quarkus.io/guides/getting-started-reactive or quarkus.io/guides/quarkus-reactive-architecture To your question, I have to repeat that if you return a Uni, it means that the response completes when the Uni completes. If your Uni completes after the extra action completes, the client will have to wait.Riches
Thanks for the links, which clarified most of my doubts! Got one confusion afterwards, though...In order to not block the I/O thread I need to return Uni, but from smallrye.io/smallrye-mutiny/guides/imperative-to-reactive it says if the Uni item is created from a IO blocking method (which applies to most of our endpoints, where we call external APIs or access database or access files), then it'll still block the I/O thread, am I understanding this correctly? Should I use .runSubscriptionOn to run them on another thread to unblock the I/O thread? Thanks again!Athena
That is right. If you use RESTEasy Reactive and don't mark the method as @Blocking, then it runs on the event loop thread (sometimes called IO thread). If you need to run a blocking action, you need to offload it to a worker thread. In latest Quarkus versions (I think it's since 2.2), if your method does not return a Uni, it will automatically run on a worker thread. But that would still result in the response being complete when the entire method finishes. It seems to me that you want to submit the blocking action to an executor and return a complete response.Riches
I dug deeper into Quarkus and Mutiny docs and came up with a folllow-up question, would appreciate if you could take a look. I put my question at the very end in my original post, thanks in advance!Athena
If your methods are blocking, you always have to run them on a worker thread. Wrapping into Uni makes no sense. Of course you can wrap the call into Uni and use an operator to execute the Uni on another thread, but that's just a more complex way of expressing the default behavior, so...Riches

© 2022 - 2024 — McMap. All rights reserved.