How to build a async rest endpoint that calls blocking action in worker thread and replies instantly (Quarkus)
Asked Answered
C

2

5

I checked the docs and stackoverflow but didn't find exactly a suiting approach. E.g. this post seems very close: Dispatch a blocking service in a Reactive REST GET endpoint with Quarkus/Mutiny However, I don't want so much unneccessary boilerplate code in my service, at best, no service code change at all.

I generally just want to call a service method which uses entity manager and thus is a blocking action, however, want to return a string to the caller immidiately like "query started" or something. I don't need a callback object, it's just a fire and forget approach.

I tried something like this

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
    return Uni.createFrom()
    .item("query started")
    .call(() -> service.startLongRunningQuery());
}

But it's not working -> Error message returned to the caller:

You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.",

I actually expected quarkus takes care to distribute the tasks accordingly, that is, rest call to io thread and blocking entity manager operations to worker thread. So I must using it wrong.

UPDATE:

Also tried an proposed workaround that I found in https://github.com/quarkusio/quarkus/issues/11535 changing the method body to

return Uni.createFrom()
        .item("query started")
        .emitOn(Infrastructure.getDefaultWorkerPool())
        .invoke(()-> service.startLongRunningQuery());

Now I don't get an error, but service.startLongRunningQuery() is not invoked, thus no logs and no query is actually sent to db.

Same with (How to call long running blocking void returning method with Mutiny reactive programming?):

return Uni.createFrom()
            .item(() ->service.startLongRunningQuery()) 
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())

Same with (How to run blocking codes on another thread and make http request return immediately):

ExecutorService executor = Executors.newFixedThreadPool(10, r -> new Thread(r, "CUSTOM_THREAD"));

return Uni.createFrom()
                .item(() -> service.startLongRunningQuery())
                .runSubscriptionOn(executor);

Any idea why service.startLongRunningQuery() is not called at all and how to achieve fire and forget behaviour, assuming rest call handled via IO thread and service call handled by worker thread?

Coumarin answered 22/2, 2022 at 10:38 Comment(3)
Why do you want to use an async endpoint when it's going to call a sync service?Pismire
The idea is, that the REST endpoint starts a long running service call but returns an instant reply to the caller, e.g. just a string "submitted"Coumarin
In that case you should probably just submit the task to an ExecutorServicePismire
I
6

It depends if you want to return immediately (before your startLongRunningQuery operation is effectively executed), or if you want to wait until the operation completes.

If the first case, use something like:

@Inject EventBus bus;

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public void triggerQuery() {
    bus.send("some-address", "my payload");
}

@Blocking // Will be called on a worker thread
@ConsumeEvent("some-address")
public void executeQuery(String payload) {
    service.startLongRunningQuery();
}

In the second case, you need to execute the query on a worker thread.

@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
   return Uni.createFrom(() -> service.startLongRunningQuery())
      .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
}

Note that you need RESTEasy Reactive for this to work (and not classic RESTEasy). If you use classic RESTEasy, you would need the quarkus-resteasy-mutiny extension (but I would recommend using RESTEasy Reactive, it will be way more efficient).

Indeed answered 23/2, 2022 at 7:23 Comment(6)
the first case describes the solution which is working exactly as required, with a minimum of code change. Great! Only one side note I was confused about, that I cannot use enums on @ConsumeEvent as it's stated to be non-final (?). So I used a constant in a class.Coumarin
Do you mean for the address? This is a limitation of Java annotation. The work around is to declare a constant.Indeed
I just need to pass a List<Long> as payload and EventBus seems not to be able to do it.. Failed to start application (with profile local): java.lang.ClassNotFoundException: java.util.List<java.lang.Long> seriously??Coumarin
When I try using this, I get an Error: com.oracle.graal.pointsto.constraints.UnresolvedElementException: Discovered unresolved method during parsing: regarding the method annotated with @ConsumeEventMarindamarinduque
@Indeed I’m integrating the first proposed solution with the Eventbus and Eventconsumer into my app. There is one thing I can’t get my head around. Hoping you can explain a bit. The service which is injected in my Resource, is requestScoped. How does this integrate with eventbus. The original request which adds an event to the bus is already finished. Can that work?Shipboard
The send and receiver, in this case, won't share the same request scope. We do not propagate the request scope on the event bus.Indeed
D
0

Use the EventBus for that https://quarkus.io/guides/reactive-event-bus

Send and forget is the way to go.

Dupondius answered 22/2, 2022 at 21:54 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.