I've read that there is an option to make a blocking call using Mono. So I tried to write several code snippets:
A)
Mono.just("qwerty")
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.block()
B)
Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = customMono
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic());
System.out.println("blockedMono.block(): " + blockedMono.block());
C)
Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = Mono.just(0)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.then(customMono);
System.out.println("blockedMono.block(): " + blockedMono.block());
leads to the same error:
block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
Is there way to fix it ?
P.S. I need a blocking call and I am aware that it is not good to use blocking operations in reative code
P.S.2
This works but I want to avoid converting to Future
Mono.just("qwerty").toFuture().get()
P.S.3
As @dan1st noticed the behaviour depends on execution context.
All those code snippets works without any exceptions if we put them to the main method
The behaviour described in the topic is experienced if we put the code inside
@GetMapping(..) public void someEndpoint(...) { // snippet is here }
So this behaviour depends on spring web flux somehow
Could you please clarify why and how to fix it ?
UPDATE
Based on the answer I was able to write the code block below. As a result this line doesn't throw any exception but it returns null
.
org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
ReactiveSecurityContextHolder.getContext()
.publishOn(Schedulers.boundedElastic()) //<-- this allows .block() somewhere downstream
.subscribeOn(Schedulers.boundedElastic())
.flatMap(securityContext -> Mono.just((org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal()));
Mono<org.springframework.security.oauth2.jwt.Jwt> secondMono = Mono.just(firstMono)
.publishOn(Schedulers.boundedElastic())
.map(it -> {
org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
System.out.println(Thread.currentThread() + "-" + jwt);
return jwt;
});
return secondMono;
}
So endpoint method fails with error:
java.lang.NullPointerException: The mapper [....my.Controller$$Lambda$2012/0x0000000800b68840] returned a null value
But if I write
@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
ReactiveSecurityContextHolder.getContext()
.map(securityContext ->(org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal());
return firstMono;
}
everything is OK and response contains JWT. Looks like it is because of spring webflux context magic.
void
in your example)? – Tridentumjoin()
ing the thread in a reactive method is a bad idea - it's best to not block there at all. – Tridentum