I am using reactor kafka and have a custom AvroDeserializer class for deserialization of messages.
Now I have a case where for certain payloads the deserialization class throws an exception.
My Kafka listener dies as soon as it tries to read such records.
I tried handling this exception using onErrorReturn
and using combination of (doOnError
and onErrorContinue
), however, it helped log the exception, but failed to consume subsequent records.
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
public T deserialize( String topic, byte[] data ) {
try {
//logic to deserialize
}
catch {
// throw new SerializationException("error deserializing" );
}
}
}
At the listener end, I'm trying to handle like this ->
@EventListener(ApplicationStartedEvent.class)
public class listener() {
KakfaReceiver<String, Object> receiver; // kafka listener
receiver.receive()
.delayUntil(do something)//logic to update the record to db
.doOnError(handle error)//trying to handle the exceptions including desrialization exception - logging them to a system
.onErrorContinue((throwable, o) -> log.info("continuing"))
.doOnNext(r->r.receiverOffset().acknowledge())
.subscribe()
}
One option is not to throw exception from the Deserializer class, but I want to log such exceptions in a separate system for analytics, and hence want handling of such records at the kafka listener end. Any thoughts on how this can be achieved?
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue
to continue on deser error. Note that producer should use avro + schema registry too to avoid deser errors. – Rettke