Using onErrorResume to handle problematic payloads posted to Kafka using Reactor Kafka
Asked Answered
R

2

6

I am using reactor kafka to send in kafka messages and receive and process them. While receiving the kakfa payload, I do some deserialization, and if there is an exception, I want to just log that payload ( by saving to mongo ), and then continue receiving other payloads.

For this I am using the below approach -

@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
   for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
       flux.delayUntil(//some function to do something)
       .doOnNext(r -> r.receiverOffset().acknowledge())
       .onErrorResume(this::handleException()) // here I'll just save to mongo 
       .subscribe();
   }
}


private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
 // save to mongo
 return Flux.empty();
}

Here I expect that whenever I encounter an exception while receiving a payload, the onErrorResume should catch it and log to mongo and then I should be good to continue receiving more messages when I send to the kafka queue. However, I see that after the exception, even though the onErrorResume method gets invoked, but I am not able to process anymore messages sent to Kakfa topic. Anything I might be missing here?

Roof answered 16/7, 2020 at 17:36 Comment(0)
R
0

As mentioned by @bsideup too, I ultimately went ahead with not throwing exception from the deserializer, since the kafka is not able to commit offset for that record, and there is no clean way of ignoring that record and going ahead with further consumption of records as we dont have the offset information of the record( since it is malformed). So even if I try to ignore the record using reactive error operators, the poll fetches the same record, and the consumer is then kind of stuck

Roof answered 21/8, 2020 at 17:13 Comment(1)
How exactly did you did "not throwing exception from the deserializer"? Did you implement your own deserializer?Impair
N
3

If you need to handle the error gracefully, you can add onErrorResume inside delayUntil:

flux
    .delayUntil(r -> {
        return process(r)
            .onErrorReturn(e -> saveToMongo(r));
    });
    .doOnNext(r -> r.receiverOffset().acknowledge())
    .subscribe();

Reactive operators treat error as a terminal signal, and, if your inner logic (inside delayUntil) throws an error, delayUntil will terminate the sequence, and onErrorReturn after delayUntil will not make it continue processing the events from Kafka.

Nonrepresentational answered 17/7, 2020 at 10:16 Comment(3)
Thanks @bsideup.I this should work for other exceptions that my application throws. But for one particular type of exception,I am still wondering if this will work, i.e deserialization exception. Since that happens while kafka listener is receiving the record, what currently happens in my application is, directly jumps to onErrorResume, but somehow that doesnt keep the consumer alive, and it fails to consume subsequent records.I have an option of not throwing that exception from the deserializer class, but I want to capture it here at the listener. Any thoughts around that?Roof
I would recommend to return a special value from the deserializer that will indicate that the deserialization failed. You can handle it in process later. Otherwise, and error thrown from the deserializer will affect the whole batch of records from poll().Nonrepresentational
Thanks @bsideup. I think this makes sense. I ultimately went ahead with not throwing exception from the deserializer, since the kafka is not able to commit offset for that record, and there is no clean way of ignoring that record and going ahead with further consumption of records as we dont have the offset information of the record( since it is malformed). So even if I try to ignore the record using reactive error operators, the poll fetches the same record, and the consumer is then kind of stuckRoof
R
0

As mentioned by @bsideup too, I ultimately went ahead with not throwing exception from the deserializer, since the kafka is not able to commit offset for that record, and there is no clean way of ignoring that record and going ahead with further consumption of records as we dont have the offset information of the record( since it is malformed). So even if I try to ignore the record using reactive error operators, the poll fetches the same record, and the consumer is then kind of stuck

Roof answered 21/8, 2020 at 17:13 Comment(1)
How exactly did you did "not throwing exception from the deserializer"? Did you implement your own deserializer?Impair

© 2022 - 2025 — McMap. All rights reserved.