what does Mono.defer() do?
Asked Answered
C

4

67

I've come across Mono.defer() in some Spring webflux code

I looked up the method in the docs but don't understand the explanation:

"Create a Mono provider that will supply a target Mono to subscribe to for each Subscriber downstream"

please could I have an explanation and an example. Is there a place with a bunch of Reactor example code (their unit tests?) that I might reference.

thanks

Comparator answered 2/5, 2019 at 15:19 Comment(4)
Have you checked the javadoc? Majority of methods on the Flux/Monos contain a diagram of how it works with the reactive stream.Carlotacarlotta
yeah, the quote from above comes from the javadoc. I personally find the language hard to understand; some trival code examples would help me. I'm cloning the reactor-core project and going to look through the unit tests to see if that will help.Comparator
the combination of the example code, the link and the comment that RxJava's Observable should be substituted for Mono or Flux when reading its examples helped me.Comparator
reactor code is hard to read and predict, at least initially, also these diagrams are yet another language for which I would need a tutorial :)Chilung
U
154

It is a bit of an oversimplification but conceptually Reactor sources are either lazy or eager. More advanced ones, like an HTTP request, are expected to be lazily evaluated. On the other side the most simple ones like Mono.just or Flux.fromIterable are eager.

By that, I mean that calling Mono.just(System.currentTimeMillis()) will immediately invoke the currentTimeMillis() method and capture the result. Said result is only emitted by the Mono once it is subscribed to. Subscribing multiple times doesn't change the value either:

Mono<Long> clock = Mono.just(System.currentTimeMillis());
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //we use block for demonstration purposes, returns t0

Thread.sleep(7_000);
//time == t17
clock.block(); //we re-subscribe to clock, still returns t0

The defer operator is there to make this source lazy, re-evaluating the content of the lambda each time there is a new subscriber:

Mono<Long> clock = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //invoked currentTimeMillis() here and returns t10

Thread.sleep(7_000);
//time == t17
clock.block(); //invoke currentTimeMillis() once again here and returns t17
Unlive answered 3/5, 2019 at 14:48 Comment(6)
When I understand this answer I never liked this kind of explanation as it kinda suggest that its library choice that Mono.just(System.currentTimeMillis()) will execute instantly when called. But it's just how java works, no one would expect that somehow method call would be removed and delayed by library.Paronym
yeah, that's just how Java works, method invocation not wrapped in a lambda cannot be magically made lazy. I didn't intend to make it like the library is making a "choice"Benco
@SimonBaslé I wrote Netty HTTP client (why I couldn't use Reactor Netty is another topic) that returns a Flux. I don't want any emissions to happen until someone subscribes, and am using defer as follows. Is this the correct usage? publisher = UnicastProcessor.create<ByteBuf>(); chunks = Flux.defer { publisher }. return chunks to caller and publish on publisherSain
it looks like your library itself guarantees 1 subscriber (otherwise you'd see errors with the unicast processor). I don't think wrapping in a defer has any value, the unicastProcessor already implements Flux. there is also Flux.create that you should look atBenco
@SimonBaslé Could you please have a look at my question https://mcmap.net/q/296834/-what-is-the-correct-reactive-pattern-in-java-spring-webflux-to-return-a-large-file-from-the-rest-controller/2886891 ? Thanks a lot!Tantalizing
@SimonBaslé Could you please have a look at my question https://mcmap.net/q/296835/-interference-between-onerrorresume-and-thenreturn/2886891 ? Thanks a lot!Tantalizing
R
37

with simple words if you see in the first view it is like Mono.just() but is not. when you run Mono.just() it creates immediately an Observable(Mono)and reuses it but when you use defer it doesn't create it immediately it creates a new Observable in every subscribe.

One use case to see the difference

    int a = 5;
@Override
public void run(String... args) throws Exception {

    Mono<Integer> monoJust = Mono.just(a);
    Mono<Integer> monoDefer = Mono.defer(() -> Mono.just(a));

    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));

    a = 7;
    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));
}

print:

5
5
5
7

if you see mono.just has created the observable immediately and it doesn't change even if the value has changed but the defer create the observable in subscribe so you will work with the current onSubscribe value

Rhizomorphous answered 3/5, 2019 at 8:24 Comment(11)
thanks @RichardKollcaku so the mono has a snapshot of state of its underlying, and defer is saying, get the current state each time subscribe is calledComparator
Your explanation is not totally right. There is nothing such as "create new Observable". just and defer create an Observable. Mono.just returns the value for each subscription while Mono.defer evaluates the Supplier function.Caster
im not talking Observable as object but Observable as logic. Mono is en Observable. Mono.just create new Observable it means create new Mono @htnRhizomorphous
@jrender it can not really be applied to a snapshot of state in case of Mono.just(). In this example, he uses a primitive type so the result does not change, but if you use an Object (such as list) and mutate it (add a new value) between 2 subscriptions, you will have 2 differents results, but it's still the same object.Caster
@RicardKollcaku Don't worry, I know very well how it works defer operator. The term Observable does not exist for Spring Reactor, so Observable = Mono | Flux for me.Caster
@hth so both Mono's will contain MyList, the deferred Mono will have the latest state of MyList each time subscribe is called, whereas a 'normal' Mono will have the state of the list when it was createdComparator
@RichardKollcaku how much of that RxJava stuff is applicable when learning project-reactor ?Comparator
@jrender like 80% of rxjava is in project reactor as project reactor is created mostly by rxjava devs but is created using java8 not java6. There are some new components in rxjava and some in project-reactor but basically in the big picture are almost sameRhizomorphous
@jrender Mono.just() always returns the same object for each subscription, but this object can be mutated over time. So we can have a list of 2 items for the first subscription, and 3 items for the second. Mono.defer(Supplier): in this case, the Supplier is evaluated for each subscription ==> you can have differents Mono(s), even an empty Mono.Caster
@htn Observable, Observer is term used more in reactive programming thats why i thought using Observable term could be easier way of explanationRhizomorphous
So basically defer is a mutable publisher while just is immutable?Olathe
V
6

I was attempting defer for a different use case. Wrote the below code to check and sharing as it might help others. My use case was to chain two Monos and ensure that the first one is complete before the second one is taken up. And second one contained a blocking call whose result is used to respond a Mono either with empty or error response. Without defer, my blocking call is executed irrespective of the result of first one. But while using defer the blocking call is executed only when the first Mono completes. Code below:

public static void main(String[] args) {
    long cur = System.currentTimeMillis();
    boolean succeed = true;

    Mono<Integer> monoJust = Mono.create(consumer -> {
        System.out.println("MonoJust inside " + (System.currentTimeMillis() - cur));
        if (succeed) {
            consumer.success(1);
        } else {
            consumer.error(new RuntimeException("aaa"));
        }
    });

    Mono<String> monoJustStr = Mono.create(consumer -> {
        System.out.println("MonoJustStr inside: " + (System.currentTimeMillis() - cur));
        consumer.success("one");
    });

    System.out.println("##1##: Begin");
    monoJust.then(evaluator() ? Mono.empty() : monoJustStr).subscribe(d -> System.out.println("##1##: "+d), e-> System.err.println(e));
    System.out.println("##1##: Done: "+(System.currentTimeMillis() - cur));

    System.out.println("\n\n\n##2##: Begin");
    monoJust.then(Mono.defer(() -> evaluator() ? Mono.empty() : monoJustStr)).subscribe(d -> System.out.println("##2##: "+d), e-> System.err.println(e));
    System.out.println("##2##: Done: " + (System.currentTimeMillis() - cur));

}

private static boolean evaluator() {
    System.out.println("Inside Evaluator");
    return false;
}

Output with succeed=true - Observe the sequence of "Inside Evaluator" and "MonoJust inside"

##1##: Begin
Inside Evaluator
MonoJust inside 540
MonoJustStr inside: 542
##1##: one
##1##: Done: 542



##2##: Begin
MonoJust inside 544
Inside Evaluator
MonoJustStr inside: 544
##2##: one
##2##: Done: 544

Below is output with succeed = false - Note that evaluator is not called.

##1##: Begin
Inside Evaluator
MonoJust inside 565
java.lang.RuntimeException: aaa
##1##: Done: 567



##2##: Begin
MonoJust inside 569
java.lang.RuntimeException: aaa
##2##: Done: 569
Vidda answered 27/5, 2019 at 3:56 Comment(1)
consumer.error(new RuntimeError()) is Unreachable codeElbrus
P
2

Simple answer for beginners:

When calling subscribe on monoJust variable it will print a single random integer thrice. But on calling subscribe on monoDefer variable it can print a random number everytime.

   Mono<Integer> justMono = Mono.just((new Random()).nextInt(10));

    //this will print same random number thrice
    for(int i=0;i<3;i++)
        justMono.subscribe(x -> {System.out.println("Just Mono: " + x);});

    Mono<Integer> deferMono = Mono.defer(() -> Mono.just((new Random()).nextInt(10)));

    //this might print three different random numbers
    for(int i=0;i<3;i++)
        deferMono.subscribe(x -> {System.out.println("Defer Mono: " + x);});

In Mono.just() instantiation happens only once when the first subscribe happens. In Mono.defer() instantiation happens everytime a subscribe is called.

For more reference check this out: https://www.youtube.com/watch?v=eupNfdKMFL4&t=381s at 3:15 mins

Pompea answered 13/6, 2021 at 14:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.