Sending JMS messages in a Spring WebFlux reactive handler: is it blocking?
Asked Answered
O

2

7

Is this the correct way to handle reactively? I see 2 threads one reactive nio which is until and including flatMap(fareRepo::save). The other thread is computations thread which starts from sending message and goes on till ServerResponse.build(). My question is this correct way to handle request reactively? Note: that fareRepo is reactive couchbase repo. thanks

return request.bodyToMono(Fare.class).flatMap(fareRepo::save).flatMap(fs -> {
            logger.info("sending message: {}, to queue", fs.getId());
            jmsTemplate.send("fare-request-queue", (session) -> session.createTextMessage(fs.getId()));
            return Mono.just(fs);
        }).flatMap(fi -> ServerResponse.created(URI.create("/fare/" + fi.getId())).build());
Octahedral answered 19/4, 2018 at 7:53 Comment(3)
Your question is off-topic because it is opinion-based primarily but this fareRepo::save is probably synchronous operation, so you kinda mixing non-blocking stuff and the blocking one. I would say it is not a good practice.Harlequinade
@john , forgot to mention that fareRepo is also reactive.Octahedral
@Octahedral mention is not needed as you're calling flatMap :)Overcheck
S
4

I'm assuming you're using Spring Framework's JmsTemplate implementation, which is blocking.

Without more context, we can only assume that you have a blocking operation in the middle of a reactive operator and that this will cause issues in your application.

Syllabify answered 24/4, 2018 at 17:51 Comment(1)
Assuming this is the case, I found that a (bad) alternative is to call .block() at the end of the reactive stream, but of course this defeats the purpose of using WebFluxHeadachy
C
4

Spring JmsTemplate will block your request thread which is not good for reactive design coding. you can try with .publishOn(Schedulers.elastic()) which will create new thread and execute code without blocking request thread. since it is I/O bound operation use Schedulers.elastic()

return request.bodyToMono(Fare.class).flatMap(fareRepo::save)
.publishOn(Schedulers.elastic())
.flatMap(fs -> {
            logger.info("sending message: {}, to queue", fs.getId());
            jmsTemplate.send("fare-request-queue", (session) -> session.createTextMessage(fs.getId()));
            return Mono.just(fs);
        }).flatMap(fi -> ServerResponse.created(URI.create("/fare/" + fi.getId())).build());
Cybil answered 28/9, 2020 at 21:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.