Let's say I have an array of ids: [9, 8, 7, 6]
.
I do some processing and one element causes to throw an exception. I want to handle this situation on my own way (let's say log it) and let the other elements go with the flow.
How can I know which one was it? I need to have this element in my onError
processing.
Flux.fromArray(myArray)
.flatMap(element -> {
var foo = processMyEl(element);
return anotherProcess(foo); // this returns Mono
})
.onErrorOperator(element -> handleMyError(element)) // this line is what I need
So, what I saw, there's this almost nice .onErrorContinue((error, obj) ->
that emits an error and an object.
But this obj
is not the element
that caused the exception but the object that did so. It happens inside of my processing methods and it doesn't have to be the same type of object every time.
.onErrorReturn(...)
- not really what I want
.doOnError(error ->
- no information of my element
.onErrorResume(error ->
- same as above
there were suggestions that I can create my own Exception and pass there the element and then retrieve it from the exception. But how should I throw the exception?
Should I go with an old way of try catch:
Flux.fromArray(myArray)
.flatMap(el -> {
try {
var foo = processMyEl(el);
return anotherProcess(foo); // this returns Mono
} catch (Exception e) {
return Mono.error(new MyException(el));
}
})
.onErrorOperator(error -> handleMyError(error.getElement()))
It doesn't look well
Edit:
Not only it looks bad, but also doesn't work. The exception is not caught at all and triggers directly doOnTerminate()
and stops the whole stream
Update:
Thanks to @JEY I used .onErrorResume()
inside flatMap
.
I also transformed first method to be a reactive stream by Mono.defer(() -> Mono.just(processMyEl(el)))
.
Just as a note: using Mono.defer()
allows me to use onErrorResume
since Mono.just()
cannot signal errors.
Final code looks like this:
Flux.fromArray(myArray)
.flatMap(element -> Mono.defer(() -> Mono.just(processMyEl(element)))
.onErrorResume(th -> handleMyError(element, th))
)
.flatMap(foo -> anotherProcess(foo)
.onErrorResume(th -> handleMyError(foo, th)
)
Where:
private Mono<> handleMyError(el, th) {
// handling code
return Mono.empty()
}
Mono
- as I mentioned in the question, and the exception can be thrown there, it's not going to be caught by try catch since it's not in its scope anymore – OlenolinMono.just(processMyEl(el))
and then applyonErrorResume
like you suggested to both of them, feel free to post an answer and I'll accept it; I'll also post my final code – Olenolin