Mono switchIfEmpty() is always called
Asked Answered
F

2

65

I have two methods.
Main method:

@PostMapping("/login")
public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
    return socialService.verifyAccount(loginUser)
            .flatMap(socialAccountIsValid -> {
                if (socialAccountIsValid) {
                    return this.userService.getUserByEmail(loginUser.getEmail())
                            .switchIfEmpty(insertUser(loginUser))
                            .flatMap(foundUser -> updateUser(loginUser, foundUser))
                            .map(savedUser -> {
                                String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                return new ResponseEntity<>(HttpStatus.OK);
                            });
                } else {
                    return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                }
            });

}

And this invoked method (the service calls an external api):

public Mono<User> getUserByEmail(String email) {
    UriComponentsBuilder builder = UriComponentsBuilder
            .fromHttpUrl(USER_API_BASE_URI)
            .queryParam("email", email);
    return this.webClient.get()
            .uri(builder.toUriString())
            .exchange()
            .flatMap(resp -> {
                if (Integer.valueOf(404).equals(resp.statusCode().value())) {
                    return Mono.empty();
                } else {
                    return resp.bodyToMono(User.class);
                }
            });
} 

In the above example, switchIfEmpty() is always called from the main method, even when a result with Mono.empty() is returned.

I cannot find a solution for this simple problem.
The following also doesn't work:

Mono.just(null) 

Because the method will throw a NullPointerException.

What I also can't use is the flatMap method to check that foundUser is null.
Sadly, flatMap doesn't get called at all in case I return Mono.empty(), so I cannot add a condition here either.

@SimY4

   @PostMapping("/login")
    public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
        userExists = false;
        return socialService.verifyAccount(loginUser)
                .flatMap(socialAccountIsValid -> {
                    if (socialAccountIsValid) {
                        return this.userService.getUserByEmail(loginUser.getEmail())
                                .flatMap(foundUser -> {
                                    return updateUser(loginUser, foundUser);
                                })
                                .switchIfEmpty(Mono.defer(() -> insertUser(loginUser)))
                                .map(savedUser -> {
                                    String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                    return new ResponseEntity<>(HttpStatus.OK);
                                });
                    } else {
                        return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                    }
                });

    }
Faugh answered 25/1, 2019 at 23:0 Comment(12)
I am not sure whether I get this sentence right. switchIfEmpty() is always called from the main method, even when a result with Mono.empty() is returned.. It is meant to be called isnt it ?Alpestrine
Can you elaborate on your problem a bit more? ''switchIfEmpty() is always called from the main method, even when a result with Mono.empty() is returned''. This is the expected behaviour.Dalessandro
@Alpestrine What I want to achieve is that if the external service returns 404, I can return a Mono with value null from the service layer which can be handled by the main method. I guess I could throw an error too, but prefer not to. The 404 should be handled on the service layer, and when a user is not found, this is application logic which I feel I should handle with if, and not by exception handling. I'm going to review switfhIfEmpty in the docs. Still, a working suggestion?Faugh
@PrashantPandey Please see above comment.Faugh
@Trace, your code still works, if 404, you are returning Mono.empty() which is going to call switchIfEmpty. Anyways if you want to handle errors if that is what you are looking for then you can use onErrorResume() and handle appropriately or you can also use onErrorReturn(). guideAlpestrine
@guide The problem is that switchIfEmpty also gets called when the Mono is not empty, which I don't want. The condition would be -> insert if empty, else update.Faugh
definitely it cant be. Kindly enable the logs and share the logs outputAlpestrine
@Faugh This is how I would handle this: If the response contains a 404, throw a wrapped exception. Then, in the main method, fall back to an alternate Mono using onErrorResume(insertUser(loginUser)). If you don't want to throw an exception, then your logic should work just fine: return an empty observable and handle it using switchIfEmpty.Dalessandro
@Faugh The problem is that switchIfEmpty also gets called when the Mono is not empty: Are you sure? Can you debug a bit, since this should not happen.Dalessandro
@PrashantPandey The insertUser method in switchIfEmpty gets called before the flatMap method handling the 404 gets called. No idea why this is the case.Faugh
@Faugh Prolly a silly idea but could you add a .publishOn(Schedulers.parallel()) after the flatmap in the getUserByEmail method? Asking this since I am doing something very similar right now and it is working for me.Dalessandro
@Faugh can you please provide insertUser(loginUser) methodRequisition
C
106

It's because switchIfEmpty accepts Mono "by value". Meaning that even before you subscribe to your mono, this alternative mono's evaluation is already triggered.

Imagine a method like this:

Mono<String> asyncAlternative() {
    return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }));
}

If you define your code like this:

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

It'll always trigger alternative no matter what during stream construction. To address this you can defer evaluation of a second mono by using Mono.defer

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

This way it will only print "Hi there" when alternative is requested

UPD:

Elaborating a little on my answer. The problem you're facing is not related to Reactor but to Java language itself and how it resolves method parameters. Let's examine the code from the first example I provided.

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

We can rewrite this into:

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = asyncAlternative();
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

These two code snippets are semantically equivalent. We can continue unwrapping them to see where the problem lies:

Mono<String> firstMono = Mono.just("Some payload");
CompletableFuture<String> alternativePromise = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }); // future computation already tiggered
Mono<String> alternativeMono = Mono.fromFuture(alternativePromise);
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

As you can see future computation was already triggered at the point when we start composing our Mono types. To prevent unwanted computations we can wrap our future into a defered evaluation:

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

Which will unwrap into

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = Mono.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }))); // future computation defered
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

In second example the future is trapped in a lazy supplier and is scheduled for execution only when it will be requested.

UPD: 2022:

Since some time project reactor comes with an alternative API for wrapping eagerly computed futures which results in the same - trapping eager computation in a lazy supplier:

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.fromCompletionStage(() -> alternativePromise()));
Cheerleader answered 29/1, 2019 at 1:50 Comment(10)
I tried this, but the method as you define asyncAlternative(), always gets triggered despite the Mono.defer().Faugh
@Faugh can you show me what you've tried? Because defer's only purpose is to not allow evaluation happen before time.Cheerleader
Please see updated post. In debug mode I've seen that the Mono.defer is called after, but that doesn't take away that it always gets executed, even when this.userService.getUserByEmail(loginUser.getEmail()) does not return Mono.empty().Faugh
It'll always trigger alternative no matter what during stream construction. Then what's the use. It's a method named switchIfEmpty. Some things really don't make sense.Faugh
Thanks for the explanation. But as you see in my code example, I use your deferred method, but the callback still always gets executed despite using defer. The getUserByEmail returns a Mono, so do the methods insertUser and updateUser. I really don't understand why.Faugh
Your answer was correct. I broke it down, turned out that the reason why switchIfEmpty got triggered was because in fact updateUser returned an empty body with http status code 204! I modified the api a bit reluctantly, but now it works correctly. Thanks for this!Faugh
Do we really need Mono.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() -> { System.out.println("Hi there"); return "Alternative"; }))); as whole or just Mono.defer(() -> "Alternative"); is enough?Vitriform
@Vitriform the rule of thumb is: if the alternative branch has side effects - then defer it. Mono.defer(() -> ”Alternative”) has no side effects so deferring is not necessary.Cheerleader
I understood defer, my question is do we need Mono.fromFutuer(CompletableFuter.supplyAsync(())... with defer? or just Mono.defer() is enough? (Let's assume branch has side effect.)Vitriform
You don't need a future to make reactive code concurrent. In fact, I’d recommend avoiding that because of the issues described above. But sometimes it's a necessity.Cheerleader
D
42

For those who, despite the well voted answer, do not still understand why such a behaviour:

Reactor sources (Mono.xxx & Flux.xxx) are either:

  • Lazily evaluated : the content of the source is evaluated/triggered only when a subscriber subscribes to it;

  • or eagerly evaluated : the content of the source is immediately evaluated even before the subscriber subscribes.

Expressions like Mono.just(xxx), Flux.just(xxx), Flux.fromIterable(x,y,z) are eager.

By using defer(), you force the source to be lazily evaluated. That's why the accepted answer works.

So doing this:

 someMethodReturningAMono()
  .switchIfEmpty(buildError());

with buildError() relying on an eager source to create an alternative Mono will ALWAYS be evaluated before the subscription:

Mono<String> buildError(){
       return Mono.just("An error occured!"); //<-- evaluated as soon as read
}

To prevent that, do this:

 someMethodReturningAMono()
  .switchIfEmpty(Mono.defer(() -> buildError()));

Read this answer for more.

Deal answered 19/2, 2021 at 16:44 Comment(1)
simply explained it. thank you so much.Delphina

© 2022 - 2024 — McMap. All rights reserved.