block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-
Asked Answered
E

1

2

I've read that there is an option to make a blocking call using Mono. So I tried to write several code snippets:

A)

Mono.just("qwerty")
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.block()

B)

Mono<String> customMono = Mono.just("qwerty");
Mono<String> blockedMono = customMono
        .subscribeOn(Schedulers.boundedElastic())
        .publishOn(Schedulers.boundedElastic());
System.out.println("blockedMono.block(): " + blockedMono.block());

C)

 Mono<String> customMono = Mono.just("qwerty");
 Mono<String> blockedMono =  Mono.just(0)
            .subscribeOn(Schedulers.boundedElastic())
            .publishOn(Schedulers.boundedElastic())
            .then(customMono);    
 System.out.println("blockedMono.block(): " + blockedMono.block());

leads to the same error:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1

Is there way to fix it ?

P.S. I need a blocking call and I am aware that it is not good to use blocking operations in reative code

P.S.2

This works but I want to avoid converting to Future

Mono.just("qwerty").toFuture().get()

P.S.3

As @dan1st noticed the behaviour depends on execution context.

  1. All those code snippets works without any exceptions if we put them to the main method

  2. The behaviour described in the topic is experienced if we put the code inside

    @GetMapping(..)
    public void someEndpoint(...) {
       // snippet is here
    }
    

So this behaviour depends on spring web flux somehow

Could you please clarify why and how to fix it ?

UPDATE

Based on the answer I was able to write the code block below. As a result this line doesn't throw any exception but it returns null.

org.springframework.security.oauth2.jwt.Jwt jwt = it.block();

@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
    Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
            ReactiveSecurityContextHolder.getContext()
                    .publishOn(Schedulers.boundedElastic()) //<-- this allows .block() somewhere downstream
                    .subscribeOn(Schedulers.boundedElastic())
                    .flatMap(securityContext -> Mono.just((org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal()));
    Mono<org.springframework.security.oauth2.jwt.Jwt> secondMono = Mono.just(firstMono)
            .publishOn(Schedulers.boundedElastic())
            .map(it -> {
                org.springframework.security.oauth2.jwt.Jwt jwt = it.block();
                System.out.println(Thread.currentThread() + "-" + jwt);
                return jwt;
            });
    return secondMono;
}

So endpoint method fails with error:

java.lang.NullPointerException: The mapper [....my.Controller$$Lambda$2012/0x0000000800b68840] returned a null value

But if I write

@GetMapping("/test")
public Mono<org.springframework.security.oauth2.jwt.Jwt> test() throws ExecutionException, InterruptedException {
    Mono<org.springframework.security.oauth2.jwt.Jwt> firstMono =
            ReactiveSecurityContextHolder.getContext()
                    .map(securityContext ->(org.springframework.security.oauth2.jwt.Jwt) securityContext.getAuthentication().getPrincipal());        
    return firstMono;
}

everything is OK and response contains JWT. Looks like it is because of spring webflux context magic.

Elsy answered 9/1, 2023 at 14:17 Comment(12)
Isn't this a continuation of your earlier question, #75055571?Septavalent
continuation but not duplication! It is a thumbnail rule to create separated topic for a new questionElsy
What created the thread your code is running in? Does it work outside of Reactor threads?Tridentum
@Tridentum it is written just inside the main methodElsy
Possible answer to your question : how to use block() operatorSlouch
@Tridentum Sorry - my previous comment was wrong. So you was right - behaviour depends on context. I updated topic. Could you please provide more details in the answer.Elsy
@Slouch I tried different options but I was not able to get the expected result. I will appreciated if you could provide the single working option which allow me to make a blocking call.Elsy
Is it possible to create a new thread in the endpoint and block in that thread (the endpoint seems to return void in your example)?Tridentum
@Tridentum let me tryElsy
@Tridentum I've shared experiments results here: #75062975Elsy
Note that join()ing the thread in a reactive method is a bad idea - it's best to not block there at all.Tridentum
@Tridentum It is just for experimental purposesElsy
S
6

Disclaimer

This is a rephrase and clarification of my answer from another post.

Blocking in Reactor

When you block in reactor, it means the thread calling block() is locked, waiting for the block operation to end.

One important fact is that the blocked thread is the calling thread, not one of the thread of the publisher. It might look like an obvious statement, but I've been bitten by this error in the past, and many other users also (this is the subject of your question, for example).

As block is not part of the reactive pipeline, when you do this:

public static void main(String[] args) {
   Mono.just(1000)
       .publishOn(Schedulers.single())
       .flatMap(delay -> Mono.delay(Duration.ofMillis(delay)) // <1>
       .block(); // <2>
}
  1. flatMap is executed by single scheduler
  2. block is executed by program main thread.

And when you do this:

public static void main(String[] args) {
    var publisher = Mono.just(1000)
        .subscribeOn(Schedulers.single())
        .flatMap(delay -> Mono.delay(Duration.ofMillis(delay));
   
    Mono.just("whatever")
        .publishOn(Schedulers.boundedElastic())
        .map(it -> publisher.block()) // <1>
        .block(); // <2>
}
  1. The first block is executed on a thread of the bounded elastic scheduler.
  2. The second block is executed from program main thread.

When blocking is forbidden

To discourage its usage, Reactor enforce context verification. When block() is called, it checks what is the calling thread, and if it consider the calling thread comes from a blocking incompatible scheduler, it will raise an error, to prevent locking the thread.

Most of Reactor and Webflux schedulers are incompatible with block(), because they are designed for high throughput using minimal resource.

Therefore, when you return a publisher containing block operations in webflux, most of the time it will be executed in a scheduler that do not accept it, and you end up with the error you describe.

How to block in the middle of a reactive pipeline

First and foremost, try to avoid blocking. Verify if this is avoidable. Otherwise:

  1. Select a blocking compatible scheduler:
  2. Wrap the publisher you want to block with another one, that will publish / schedule its actions on your scheduler:
    Scheduler blockingCompatibleScheduler = Schedulers.boundedElastic();
    Mono<T> toBlock = Mono...
    Mono<T> wrapper = Mono.fromCallable(() -> toBlock.block())
                          .subscribeOn(blockingCompatibleScheduler);
    // or
    Mono<T> wrapper = Mono.just("any")
                          .publishOn(blockingCompatibleScheduler)
                          .map(it -> toBlock.block());
    
    More details available in dedicated section of the official doc

Block breaks context

Until now, I was refering to the execution context. But there is a second kind: the state context.

Since Reactor 3.1, a Context / ContextView API is provided to share contextual information across chained subscriptions, from downstream to upstream.

The official documentation contains an dedicated section about this mechanism, to provide in-depth explanation (this is quite complex).

Due to its nature, block operator prevent this mechanism to work. As a context propagate information from a downstream subscription to an inner/upstream one, it cannot provide context information to block operator: it use a hidden subscription, disconnected from the parent/downstream subscription/pipeline. Said otherwise, a flux/mono cannot access an inner blocked publisher, and therefore cannot propagate context in it.

We can test it in a simplified example:

import reactor.core.publisher.Mono;

public class BlockBreaksContext {

    static final Object CTX_KEY = new Object();

    /**
     * Add "Hello" message to the provided action context, then run/block it and print
     * output value.
     */
    static void execute(Mono<String> action) {
        String value = Mono.from(action)
                           .contextWrite(ctx -> ctx.put(CTX_KEY, "Hello"))
                           .block();
        System.out.println(value);
    }

    public static void main(String[] args) {
        Mono<String> getContextValue = Mono.deferContextual(ctx
                -> Mono.just(ctx.getOrDefault(CTX_KEY, "No value from context")));

        // Without blocking, the mono will receive the context written by downstream subscription
        execute(getContextValue.map(it -> "NO BLOCKING: " + it));

        // With blocking, the Mono is **not** part of the "main" pipeline/subscription.
        // Therefore, it cannot receive context on execution, because block cause an
        // independent/isolated subscription, whose main chain of event is unaware of.
        Mono wrapBlock = Mono.fromCallable(() -> getContextValue.block());
        execute(wrapBlock.map(it -> "BLOCKING: " + it));
    }
}

This program prints:

NO BLOCKING: Hello
BLOCKING: No value from context

In your question, you try to access a security token. As the token is resolved upon client request, Webflux put it in the response publisher context. When you block, you disassociate your blocked publisher from the response publisher.

Slouch answered 12/1, 2023 at 13:3 Comment(7)
I've got an idea but I have an additional related question. Please read my topic update.Elsy
That's because there's an execution context (Scheduler API) and a state context (Context API). Blocking break both. I will try to edit my answer with more details later (when I've got a little spare time).Slouch
Thank you - will wait for update from your side.Elsy
kindly reminderElsy
Oh, sorry, I thought that you would have been notified of the edit. I've edited my answer last friday. I've added a "Block breaks context" section at the end of my answer. It shows that block prevents access to the ContextView object.Slouch
oh...let me read...Elsy
Thank you very much for your answer. I have cople of quetions although: 1. Does block() considered the same as .toFuture.get() ? 2. Webflux put it in the response publisher context Where spring Webflux stores it ? Is it possible to acces this from my code ? I've created separated topic for that: #75124644Elsy

© 2022 - 2024 — McMap. All rights reserved.