The Reactor error handling documentation (https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling) states that error-handling operators do not let the original sequence continue.
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
But the javadoc for onErrorContinue states the following (https://projectreactor.io/docs/core/3.4.10/api/index.html) -
Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.
Is onErrorContinue not considered an "error-handling operator"?
It does seem to allow the original sequence to continue -
Flux.range(1, 5)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Forcing exception for " + i);
}
return i;
})
.doOnNext(i -> System.out.println(i))
.onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
.subscribe();
Result (Dropped 3 but continued with subsequent elements)
1
2
4
5
Error while processing 3 - Forcing exception for 3
Process finished with exit code 0
The documentation does state that onErrorContinue is dependent on operator support. Is there any other way to let the original sequence (source Flux) continue that works for all operators? I dont want an alternate flux to replace my source flux in case of errors (the onErrorResume behaviour) - I just want to ignore the problem element & continue with the source flux.
EDIT 1 (My use case)
I have a reactor kafka source flux & i want to consume from it infinitely regardless of errors. I was using onErrorContinue but based on feedback received on this post, i have replaced it with onErrorResume. Below is the code i have at this point but i am not sure whether it will work in all cases (by "work", i am streaming continuously from kafka regardless of any errors). Any suggestions please?
KafkaReceiver.create(receiverOptions)
.receive()
.flatMap(record -> processRequest(record.value())
.doOnNext(e -> record.receiverOffset().acknowledge())
.doOnError(e -> {
System.err.println("Error occurred for msg: " + record.value() + ", Error " + e);
record.receiverOffset().acknowledge();
})
.onErrorResume(e -> Mono.empty()))
.repeat(() -> true)
.retryWhen(Retry.indefinitely())
.doFinally(signalType -> {
//dont expect control to ever reach here
System.err.println("KafkaReceiverFlux terminating with Signal type: " + signalType);
})
.subscribe();