Continue consuming subsequent records in reactor kafka after deserialization exception
Asked Answered
O

2

3

I am using reactor kafka and have a custom AvroDeserializer class for deserialization of messages.

Now I have a case where for certain payloads the deserialization class throws an exception. My Kafka listener dies as soon as it tries to read such records. I tried handling this exception using onErrorReturn and using combination of (doOnError and onErrorContinue), however, it helped log the exception, but failed to consume subsequent records.

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
   public T deserialize( String topic, byte[] data ) {
       try {
         //logic to deserialize
       }
       catch {
          // throw new SerializationException("error deserializing" );
       }
   }
}

At the listener end, I'm trying to handle like this ->

@EventListener(ApplicationStartedEvent.class)

public class listener() {
   KakfaReceiver<String, Object> receiver; // kafka listener
   receiver.receive()
   .delayUntil(do something)//logic to update the record to db
   .doOnError(handle error)//trying to handle the exceptions including desrialization exception - logging them to a system
   .onErrorContinue((throwable, o) -> log.info("continuing"))
   .doOnNext(r->r.receiverOffset().acknowledge())
   .subscribe()

}

One option is not to throw exception from the Deserializer class, but I want to log such exceptions in a separate system for analytics, and hence want handling of such records at the kafka listener end. Any thoughts on how this can be achieved?

Onepiece answered 17/7, 2020 at 20:21 Comment(1)
The problem is that you not acknowledge the message before onError. So when onErrorContinue runs, it will request a new elements to continue the stream, which is the last nok element. The onErrorReturn cancel the stream and go back the provided value. The onErrorResume will go with an alternate stream. Use spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue to continue on deser error. Note that producer should use avro + schema registry too to avoid deser errors.Rettke
O
1

Looks like the suggestion in the comment about using

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

will work in most cases. But I couldnt figure out of the way to make it work in Reactor Spring Kafka. For now, I went ahead with the approach of not throwing an exception from the deserializer and adding the logic to log it there itself,and that solves the issue of Kafka consumer not being able to consume subsequent records after that on poison record

Onepiece answered 16/8, 2020 at 10:43 Comment(0)
T
0

I think your problem is that you do throw an exception and any exception is a terminal even in context of a reactive stream. Just don't throw an exception.

Wrap your T (that you return from deserialize()) in a class, that can indicate whether deserialization was a success or failure. If you don't care for exact error you can use Optional<T> where empty optional will indicate deserialization error. If you are using vavr library, you can grab Option<T> from there or Try<T> to store the actual deserialization on the Failure side.

For example, your deserializer would look like

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
   public Optional<T> deserialize( String topic, byte[] data ) {
       try {
         //logic to deserialize
         return Optional.of(result);
       }
       catch {
          return Optional.empty();
       }
   }
}

Now that you are no longer throwing, your flux stays alive and you can for example filter out empty Optional instances.

P.S. If you don't want to use wrapper types, you can return null instead of throwing and filter out null values. I wound not recommend that though - easier to forget about that deserialize() behavior and get NullPointerException later.

Tasia answered 12/12, 2020 at 21:39 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.