Getting exception while doing block() on Mono object I got back from ReactiveMongoRepository object
Asked Answered
R

1

6

I have a service that streams data to a second service that receives stream of objects and saves them to my MongoDB. inside my subscribe function on the Flux object that I get from the streaming service I use the save method from the ReactiveMongoRepository interface. when I try to use the block function and get the data I get the following error :

2019-10-11 13:30:38.559  INFO 19584 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:25}] to localhost:27017
2019-10-11 13:30:38.566  INFO 19584 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 0, 1]}, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=6218300}
2019-10-11 13:30:39.158  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : onNext(Quote(id=null, ticker=AAPL, price=164.8, instant=2019-10-11T10:30:38.800Z))
2019-10-11 13:30:39.411  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : cancel()
2019-10-11 13:30:39.429  INFO 19584 --- [ntLoopGroup-2-2] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:26}] to localhost:27017
2019-10-11 13:30:39.437  WARN 19584 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil         : Failed to release a message: DefaultHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at 
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at

my code:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                    Mono<Quote> savedQuote = quoteRepository.save(quote);
                    System.out.println("I saved a quote! Id: " +savedQuote.block().getId());
                });

after some digging, I manage to get it to work but I don't understand why it works now. the new code:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                       Mono<Quote> savedQuote = quoteRepository.insert(quote);
                       savedQuote.subscribe(result ->
                                 System.out.println("I saved a quote! Id :: " + result.getId()));
    });

the definition of block(): Subscribe to this Mono and block indefinitely until a next signal is received.

the definition of subscribe(): Subscribe to this Mono and request unbounded demand.

can someone help me understand why the block didn't work and the subscribe worked? what am I missing here?

Roti answered 11/10, 2019 at 10:51 Comment(1)
Related: #51450389Bulb
B
8

Blocking is bad, since it ties up a thread waiting for a response. It's very bad in a reactive framework which has few threads at its disposal, and is designed so that none of them should be unnecessarily blocked.

This is the very thing that reactive frameworks are designed to avoid, so in this case it simply stops you doing it:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4

Your new code, in contrast, works asynchronously. The thread isn't blocked, as nothing actually happens until the repository returns a value (and then the lambda that you passed to savedQuote.subscribe() is executed, printing out you result to the console.)

However, the new code still isn't optimal / normal from a reactive streams perspective, as you're doing all your logic in your subscribe method. The normal thing to do is to us a series of flatMap/map calls to transform the items in the stream, and use doOnNext() for side effects (such as printing out a value):

stockQuoteClient.getQuoteStream()
            .log("quote-monitor-service")
            .flatMap(quoteRepository::insert)
            .doOnNext(result -> System.out.println("I saved a quote! Id :: " + result.getId())))
            .subscribe();

If you're doing any serious amount of work with reactor / reactive streams, it would be worth reading up on them in general. They're very powerful for non-blocking work, but they do require a different way of thinking (and coding) than more "standard" Java.

Bulb answered 11/10, 2019 at 11:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.