I have a service that streams data to a second service that receives stream of objects and saves them to my MongoDB. inside my subscribe function on the Flux object that I get from the streaming service I use the save method from the ReactiveMongoRepository interface. when I try to use the block function and get the data I get the following error :
2019-10-11 13:30:38.559 INFO 19584 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:25}] to localhost:27017
2019-10-11 13:30:38.566 INFO 19584 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 0, 1]}, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=6218300}
2019-10-11 13:30:39.158 INFO 19584 --- [ctor-http-nio-4] quote-monitor-service : onNext(Quote(id=null, ticker=AAPL, price=164.8, instant=2019-10-11T10:30:38.800Z))
2019-10-11 13:30:39.411 INFO 19584 --- [ctor-http-nio-4] quote-monitor-service : cancel()
2019-10-11 13:30:39.429 INFO 19584 --- [ntLoopGroup-2-2] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:26}] to localhost:27017
2019-10-11 13:30:39.437 WARN 19584 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil : Failed to release a message: DefaultHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
at
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
at
my code:
stockQuoteClient.getQuoteStream()
.log("quote-monitor-service")
.subscribe(quote -> {
Mono<Quote> savedQuote = quoteRepository.save(quote);
System.out.println("I saved a quote! Id: " +savedQuote.block().getId());
});
after some digging, I manage to get it to work but I don't understand why it works now. the new code:
stockQuoteClient.getQuoteStream()
.log("quote-monitor-service")
.subscribe(quote -> {
Mono<Quote> savedQuote = quoteRepository.insert(quote);
savedQuote.subscribe(result ->
System.out.println("I saved a quote! Id :: " + result.getId()));
});
the definition of block(): Subscribe to this Mono and block indefinitely until a next signal is received.
the definition of subscribe(): Subscribe to this Mono and request unbounded demand.
can someone help me understand why the block didn't work and the subscribe worked? what am I missing here?