I am using reactor kafka to send in kafka messages and receive and process them. While receiving the kakfa payload, I do some deserialization, and if there is an exception, I want to just log that payload ( by saving to mongo ), and then continue receiving other payloads.
For this I am using the below approach -
@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
flux.delayUntil(//some function to do something)
.doOnNext(r -> r.receiverOffset().acknowledge())
.onErrorResume(this::handleException()) // here I'll just save to mongo
.subscribe();
}
}
private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
// save to mongo
return Flux.empty();
}
Here I expect that whenever I encounter an exception while receiving a payload, the onErrorResume should catch it and log to mongo and then I should be good to continue receiving more messages when I send to the kafka queue. However, I see that after the exception, even though the onErrorResume method gets invoked, but I am not able to process anymore messages sent to Kakfa topic. Anything I might be missing here?