Correct way of throwing exceptions with Reactor
Asked Answered
P

1

132

I'm new to project Reactor and reactive programming in general.

I'm currently working on a piece of code similar to this:

Mono.just(userId)
    .map(repo::findById)
    .map(user-> {
        if(user == null){
            throw new UserNotFoundException();
        }
        return user;
    })
    // ... other mappings

This example is probably silly and there are surely better ways of implementing this case, but the point is:

Is it wrong to use a throw new exception in a map block or should I replace this with a return Mono.error(new UserNotFoundException())?

Is there any actual difference in these two ways of doing?

Predestination answered 3/12, 2018 at 14:3 Comment(1)
You would need to use flatMap() to return the exception. That being said, I personally prefer this approach, as it is likelier faster, and, perhaps, less messy.Galbraith
F
290

There are a couple of ways that could be considered as a convenient way of exception throwing:

Handle your element using Flux/Mono.handle

One of the way that could simplify handling of an element which may result in an error or empty stream is operator handle.

The following code shows how we can use it in order to solve our problem:

Mono.just(userId)
    .map(repo::findById)
    .handle((user, sink) -> {
        if(!isValid(user)){
            sink.error(new InvalidUserException());
        } else if (isSendable(user))
            sink.next(user);
        }
        else {
            //just ignore element
        }
    })

as we can see, the .handle operator requires to pass BiConsumer<T, SynchronousSink<> in order to handle an element. Here we have two parameters in our BiConsumer. The first one is an element from the upstream where the second one is SynchronousSink which helps us to supply element to downstream synchronously. Such a technique expands an ability to supply different results of our element's processing. For example, in case the element is invalid, we can supply error to the same SycnchronousSync which will cancel upstream and produce onError signal to downstream. In turn, we can "filter" using the same handle operator. Once the handle BiConsumer is executed and no element has been supplied, Reactor will consider that as kind of filtering and will request for an additional element for us. Finally, in case the element is valid, we can simply call SynchronousSink#next and propagate our element downstream or apply some mapping on it, so we will have handle as the map operator here. Moreover, we can safely use that operator with no-performance impact and provide complex element verification such as validation of element or error sending to downstream.

Throws using #concatMap + Mono.error

One of the options to throw an exception during mapping is to replace map with concatMap. In its essence, concatMap does almost the same flatMap does. The only difference is that concatMap allows only one substream at a time. Such behavior simplifies internal implementation a lot and does not impact performance. So we can use the following code in order to throw an exception in a more functional way:

Mono.just(userId)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Mono.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

In the sample above in case of an invalid user, we return exception using Mono.error. The same we can do for flux using Flux.error:

Flux.just(userId1, userId2, userId3)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Flux.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

Note, in both cases we return cold stream which has only one element. In Reactor, there is a couple of optimizations that improve performance in the case returned stream is a cold scalar stream. Thus, it is recommended to use Flux/Mono concatMap + .just, empty, error as a result when we need more complex mapping, that could end up with return null or throw new ....

Attention! Don't ever check incoming element on nullability. The Reactor Project will never send a null value for you since this violates Reactive Streams spec (see Rule 2.13) Thus, in case if repo.findById returns null, Reactor will throw NullPointerException for you.

Wait, Why concatMap is better than flatMap?

In its essence, flatMap is designed to merge elements from the multiple substreams that is executing at a time. It means that flatMap should have asynchronous streams underneath so, they could potentially process data on the multiple threads or that could be a several network calls. Subsequently, such expectations impact implementation a lot so flatMap should be able to handle data from the multiple streams (Threads) (means usage of concurrent data structures), enqueue elements if there is a draining from another stream (means additional memory allocation for Queues for each substream) and do not violate Reactive Streams specification rules (means really complex implementation). Counting all these facts and the fact that we replace a plain map operation (which is synchronous) onto the more convenient way of throwing an exception using Flux/Mono.error (which does not change synchronicity of execution) leads to the fact that we do not need such a complex operator and we can use much simpler concatMap which is designed for asynchronous handling of a single stream at a time and has a couple of optimization in order to handle scalar, cold stream.

Throws exception using switchIfEmpty

So, another approach to throw an exception when the result is empty is switchIfEmpty operator. The following code demonstrates how we can use that approach :

Mono.just(userId)
    .flatMap(repo::findById)
    .switchIfEmpty(Mono.error(new UserNotFoundExeception()))

As we can see, in this case repo::findById should have Mono of User as the return type. Therefore, in case a User instance will not be found, the result stream will be empty. Thus, Reactor will call an alternative Mono, specified as switchIfEmpty parameter.

Throw your exception as is (e.g. in your map, filter and other similar operators)

It could be counted as a less readable code or bad practice (my own opinion), but you can throw your exception as is (e.g. .map(v -> throw ...)) with Project Reactor. Even though, in someway doing so can violate Reactive Streams specification (in this context violate from the semantic perspective, because your operator under the hood is a Subscriber in a chain of Subscribers, therefore - semantically, throwing an exception in lambda could be mapped to throwing an exception in the onNext method which violates the spec's rule 2.13). However, since Reactor will catch the thrown exception for you and propagate it then as the onError signal to your downstream, it is not prohibited to do that.

Takeaways

  1. Use .handle operator in order to provide complex element processing
  2. Use concatMap+ Mono.error when we need to throw an exception during mapping but such a technique is most suitable for cases of asynchronous element processing.
  3. Use flatMap + Mono.error when we have already had flatMap in place
  4. Null as a return type is forbidden so instead of null in your downstream map you will get unexpected onError with NullPointerException
  5. Use switchIfEmpty in all cases when you need to send an error signal if the result of calling some specific function finished with the empty stream
Fungible answered 3/12, 2018 at 15:0 Comment(20)
note that only the null is problematic in the original map test. map will catch exceptions and propagate them as onError signals, so a map that throws an unchecked exception is fine.Checklist
Yup, but I don't think it is a good pattern for throwing exceptionsFungible
this and handle (which can generate the empty case) have far less overhead than concatMap + Mono.error thoughChecklist
Yeah, mentioned that. Nevertheless, sometime it would be preferable to have more readable code.Fungible
@OlehDokuka I believe it's been added to reactor in one of the newer versions but I'd suggest using supplier, when throwing an exception with .swithIfEmpty, otherwise the exception is created eagerly, even if if-empty is never reached.Abyss
Can you show some links of the specification which states, that this is bad practice? I wanted to show it to a colleague, but I just couldn't find it.Aristocratic
Reactive Streams specification say nothing about Mono and Flux. It specifies behaviour of Publishers and Subscribers. So throwing exception in one of the Mono's method can't violate Reactive Streams specification.Thornhill
@Lu55 You are right, it does not say anything about Mono and Flux cuz it is a specific implementation. I'm expressing my own opinion on throwing exceptions in lambda which from my standpoint is not the best practice in functional programming. However, it is up to you on how you are going to write and style your sourcesFungible
@OlehDokuka Thanks for the clarification! You got me right. Of course I meant "in one of the lambdas which are passed to the Mono", not the internal code of Mono class.Thornhill
Do we still have the concatMap() method?Swish
@JinKwon yes. In Flux we still have concatMap. In Mono the same method is called flatMapFungible
switchOnEmpty should use Mono.defer(() -> Mono.error(new UserNotFoundExeception())) because otherwise that code will generate every time an exception and a mono even when not emptyStuppy
@Stuppy there is Mono.error(Supplier) which eliminate the need for Mono.deferFungible
.switchIfEmpty(Mono.error(new UserNotFoundExeception())) does not work, it just throw NullPointerExceptionExaggerate
@Exaggerate I would appreciate the full context or a code sample that reproduces your issue.Fungible
Maybe I missed it but the answer does not tell why to use .concatMap vs .map, only why .concatMap vs .flatMap ? Ther reason is what to have just Mono.error? See no problem with OP code.Pygmy
@OlehDokuka I see thanks, from what I've seen myself and my team is doing -simpler is better. My main concern is each operator carries a context info, in 99 % cases exceptions which we throw are coming from chunks doing simple non-io validation, therefore changing to flatMap / concatMap for the sake of Mono.error is giving wrong impression to the reader that the lambda is doing some IO. Operator choice must communicate operation nature exactly. In your case I suspect many will follow this and start "Monojusting" for zero reason where operation does not need to be async to support this sugar.Pygmy
@Aubergine, that is why I suggest using the handle operator, to propagate error without using the throw keywordFungible
@OlehDokuka It's a great answer, thanks! However, you should fix your example with Mono as there is not a concatMap() method there. Now it is confusing and your code does not work when copied and pasted to the Java compiler.Garvin
My problem with these suggestions (except for the last one that OP suggested too): you cannot choose to throw no exception, as by default your code may yield exceptions that you don't foresee. So if the library really prefers you don't throw exceptions in .map, it prefers you to always surround your code with try/catch(Throwable), which is littering and badly readable.Romonaromonda

© 2022 - 2024 — McMap. All rights reserved.