How to get an element that caused an exception in Flux?
Asked Answered
O

2

6

Let's say I have an array of ids: [9, 8, 7, 6].

I do some processing and one element causes to throw an exception. I want to handle this situation on my own way (let's say log it) and let the other elements go with the flow.

How can I know which one was it? I need to have this element in my onError processing.

Flux.fromArray(myArray)
  .flatMap(element -> {
    var foo = processMyEl(element);  
    return anotherProcess(foo); // this returns Mono
  })
  .onErrorOperator(element -> handleMyError(element)) // this line is what I need
  

So, what I saw, there's this almost nice .onErrorContinue((error, obj) -> that emits an error and an object.

But this obj is not the element that caused the exception but the object that did so. It happens inside of my processing methods and it doesn't have to be the same type of object every time.

.onErrorReturn(...) - not really what I want

.doOnError(error -> - no information of my element

.onErrorResume(error -> - same as above

there were suggestions that I can create my own Exception and pass there the element and then retrieve it from the exception. But how should I throw the exception?

Should I go with an old way of try catch:

Flux.fromArray(myArray)
  .flatMap(el -> {
    try {
      var foo = processMyEl(el);  
      return anotherProcess(foo); // this returns Mono
    } catch (Exception e) {
      return Mono.error(new MyException(el));
     }
    })
  .onErrorOperator(error -> handleMyError(error.getElement()))

It doesn't look well

Edit:

Not only it looks bad, but also doesn't work. The exception is not caught at all and triggers directly doOnTerminate() and stops the whole stream

Update:

Thanks to @JEY I used .onErrorResume() inside flatMap.

I also transformed first method to be a reactive stream by Mono.defer(() -> Mono.just(processMyEl(el))).

Just as a note: using Mono.defer() allows me to use onErrorResume since Mono.just() cannot signal errors.

Final code looks like this:

Flux.fromArray(myArray)
    .flatMap(element -> Mono.defer(() -> Mono.just(processMyEl(element)))
        .onErrorResume(th -> handleMyError(element, th))
    )
    .flatMap(foo -> anotherProcess(foo)
        .onErrorResume(th -> handleMyError(foo, th)
    )

Where:

private Mono<> handleMyError(el, th) {
  // handling code
  return Mono.empty()
}
Olenolin answered 30/11, 2020 at 22:23 Comment(9)
You should go with you old try-catch but don't return Mono of error handle your error and return Mono.empty() so that your error won't be seen down stream and flux will keep getting consume.Wardle
yeah, I tried to have my logic in catch clause but the code doesn't even reach there, just terminates my stream and that's allOlenolin
if it doesn't reach it, it means that the error is thrown somewhere else. Without more info we won't be able to help.Wardle
no, it's being thrown inside one of those two methods, but since one method returns Mono - as I mentioned in the question, and the exception can be thrown there, it's not going to be caught by try catch since it's not in its scope anymoreOlenolin
don't use a try-catch then but a anotherProcess.onErrorResume(th -> { LOG.error("error", th); return Mono.emtpy();})Wardle
and how can I get my element there?Olenolin
You have direct access to it anotherProcess.onErrorResume(th -> { LOG.error("error with element", element, th); return Mono.emtpy();}) or anotherProcess.onErrorResume(th -> { LOG.error("error with element", element, th); return handleMyError(element).then();})Wardle
you're right! what I also did was to get rid of try catch entirely and transform to reactive stream by Mono.just(processMyEl(el)) and then apply onErrorResume like you suggested to both of them, feel free to post an answer and I'll accept it; I'll also post my final codeOlenolin
alright then glad to help.Wardle
W
2

As requested by @Kamil I'll add my comments as an answer:

You should just handle the error in the flatMap and return a Mono.empty() to discard it do something like:

Flux.fromArray(myArray)
  .flatMap(el -> anotherProcess(processMyEl(el)).onErrorResume(th -> handleError(th, el))

With handle error like:

Mono<Void> handleError(Throwable th, Object element) {
    LOG.error("An error occurred on {}", element, th);
    return Mono.empty()
}

Or if you want to do something more complex that require async:

Mono<Void> handleError(Throwable th, Object element) {
    return doSomethingThaReturnFluxOrMono(element).then();
}
Wardle answered 1/12, 2020 at 14:47 Comment(0)
G
-1
} catch (Exception e) {
    throw new MyException(el, e);
}
Gentlewoman answered 1/12, 2020 at 9:34 Comment(1)
I cannot throw an exception in catch clause in a flatMap, having return Mono.error(new MyException(el)) or return Flux.error(new MyException(el)) is the right way to signal an exception down the streamOlenolin

© 2022 - 2024 — McMap. All rights reserved.