How to catch deserialization error in Kafka-Spring?
Asked Answered
S

5

12

I'm getting up an application consuming kafka messages.

I followed Spring-docs about Deserialization Error Handling in order to catch deserialization exception. I've tried the failedDeserializationFunction method.

This is my Consumer Configuration Class

@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        
        /*  Error Handling */
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);

        return consumerProps;
    }

    @Bean
    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(NTCMessageBody.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

This is the BiFunction Provider

public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {

    @Override
    public NTCMessageBody apply(byte[] t, Headers u) {
        return new NTCBadMessageBody(t);
    }

}

public class NTCBadMessageBody extends NTCMessageBody{

    private final byte[] failedDecode;

    public NTCBadMessageBody(byte[] failedDecode) {
        this.failedDecode = failedDecode;
    }

    public byte[] getFailedDecode() {
        return this.failedDecode;
    }

}

When I send just one corrupted message on the topic I got this error (in loop):

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value

I understood that the ErrorHandlingDeserializer2 should delegate the NTCBadMessageBody type and continue the consumption. I also saw (in debug mode) it didn't never go in the constructor of the NTCBadMessageBody class.

Salvo answered 30/4, 2019 at 15:9 Comment(0)
W
7

ErrorHandlingDeserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.

So according to your code you are using record-level MessageListener then just add ErrorHandler to Container

Handling Exceptions

If your error handler implements this interface you can, for example, adjust the offsets accordingly. For example, to reset the offset to replay the failed message, you could do something like the following; note however, these are simplistic implementations and you would probably want more checking in the error handler.

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
    this.listen3Exception = e;
    MessageHeaders headers = m.getHeaders();
    c.seek(new org.apache.kafka.common.TopicPartition(
            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
            headers.get(KafkaHeaders.OFFSET, Long.class));
    return null;
   };
}

Or you can do custom implementation like in this example

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory()  {

    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");
        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");


        }
    });


    return factory;
}
Wurst answered 30/4, 2019 at 19:17 Comment(3)
I'm trying ... but seems there is a problem in your second example cause I got The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerPropertiesSalvo
Making this correction factory.setErrorHandler(new ErrorHandler() it works great! Thanks a lot!Salvo
Is there a way to access partition information (actually TopicPartition) in the custom implementation above for any given exception? We want to catch exceptions and log them to the database and then increase the offset on the partition. But by parsing the exception message as above we can do it only for SerializationException.Pastypat
S
11

Use ErrorHandlingDeserializer.

When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer configured with the proper delegates. Alternatively, you can use consumer configuration properties which are used by the ErrorHandlingDeserializer to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS; the property value can be a class or class name

package com.mypackage.app.config;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.mypacakage.app.model.kafka.message.KafkaEvent;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import lombok.extern.slf4j.Slf4j;

@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String servers;

    @Value("${listener.group-id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {
    
        ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(((exception, data) -> {
            /*
             * here you can do you custom handling, I am just logging it same as default
             * Error handler does If you just want to log. you need not configure the error
             * handler here. The default handler does it for you. Generally, you will
             * persist the failed records to DB for tracking the failed records.
             */
            log.error("Error in process with Exception {} and the record is {}", exception, data);
        }));

        return factory;

    }

    @Bean
    public ConsumerFactory<String, KafkaEvent> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
                "com.mypackage.app.model.kafka.message.KafkaEvent");
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");

        return new DefaultKafkaConsumerFactory<>(config);
    }

    private RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        /*
         * here retry policy is used to set the number of attempts to retry and what
         * exceptions you wanted to try and what you don't want to retry.
         */
        retryTemplate.setRetryPolicy(retryPolicy());

        return retryTemplate;
    }

    private SimpleRetryPolicy retryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();

        // the boolean value in the map determines whether exception should be retried
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);
        exceptionMap.put(ListenerExecutionFailedException.class, true);

        return new SimpleRetryPolicy(3, exceptionMap, true);
    }
}
Sorehead answered 25/11, 2020 at 10:41 Comment(0)
W
7

ErrorHandlingDeserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.

So according to your code you are using record-level MessageListener then just add ErrorHandler to Container

Handling Exceptions

If your error handler implements this interface you can, for example, adjust the offsets accordingly. For example, to reset the offset to replay the failed message, you could do something like the following; note however, these are simplistic implementations and you would probably want more checking in the error handler.

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
    this.listen3Exception = e;
    MessageHeaders headers = m.getHeaders();
    c.seek(new org.apache.kafka.common.TopicPartition(
            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
            headers.get(KafkaHeaders.OFFSET, Long.class));
    return null;
   };
}

Or you can do custom implementation like in this example

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory()  {

    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");
        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");


        }
    });


    return factory;
}
Wurst answered 30/4, 2019 at 19:17 Comment(3)
I'm trying ... but seems there is a problem in your second example cause I got The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerPropertiesSalvo
Making this correction factory.setErrorHandler(new ErrorHandler() it works great! Thanks a lot!Salvo
Is there a way to access partition information (actually TopicPartition) in the custom implementation above for any given exception? We want to catch exceptions and log them to the database and then increase the offset on the partition. But by parsing the exception message as above we can do it only for SerializationException.Pastypat
V
6

in my factory I've added commonErrorHander

factory.setCommonErrorHandler(new KafkaMessageErrorHandler());

and KafkaMessageErrorHandler is created as follow

class KafkaMessageErrorHandler implements CommonErrorHandler {

    @Override
    public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
        manageException(thrownException, consumer);
    }

    @Override
    public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
        manageException(thrownException, consumer);
    }

    private void manageException(Exception ex, Consumer<?, ?> consumer) {
        log.error("Error polling message: " + ex.getMessage());
        if (ex instanceof RecordDeserializationException) {
            RecordDeserializationException rde = (RecordDeserializationException) ex;
            consumer.seek(rde.topicPartition(), rde.offset() + 1L);
            consumer.commitSync();
        } else {
            log.error("Exception not handled");
        }
    }
}
Viral answered 20/9, 2022 at 13:5 Comment(0)
S
4

Above answer may have problem if the partion name have character like '-'. so, i have modified same logic with regex.

    import java.util.List;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.SerializationException;
    import org.springframework.kafka.listener.ErrorHandler;
    import org.springframework.kafka.listener.MessageListenerContainer;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class KafkaErrHandler implements ErrorHandler {
    
        /**
         * Method prevents serialization error freeze
         * 
         * @param e
         * @param consumer
         */
        private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {
            String p = ".*partition (.*) at offset ([0-9]*).*";
            Pattern r = Pattern.compile(p);
    
            Matcher m = r.matcher(e.getMessage());
    
            if (m.find()) {
                int idx = m.group(1).lastIndexOf("-");
                String topics = m.group(1).substring(0, idx);
                int partition = Integer.parseInt(m.group(1).substring(idx));
                int offset = Integer.parseInt(m.group(2));
    
                TopicPartition topicPartition = new TopicPartition(topics, partition);
    
                consumer.seek(topicPartition, (offset + 1));
    
                log.info("Skipped message with offset {} from partition {}", offset, partition);
            }
        }
    
        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {
            log.error("Error in process with Exception {} and the record is {}", e, record);
    
            if (e instanceof SerializationException)
                seekSerializeException(e, consumer);
        }
    
        @Override
        public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {
            log.error("Error in process with Exception {} and the records are {}", e, records);
    
            if (e instanceof SerializationException)
                seekSerializeException(e, consumer);
    
        }
    
        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> record) {
            log.error("Error in process with Exception {} and the record is {}", e, record);
        }
    
    } 

finally use the error handler in config.

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(macdStatusConsumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setErrorHandler(new KafkaErrHandler());

    return factory;
}

However parsing error string to get parition, topic and offset is not recommended. If anyone have better solution please post here.

Sorehead answered 24/11, 2020 at 13:13 Comment(1)
Your solution is better than the approved one. I changed int partition = Integer.parseInt(m.group(1).substring(idx)); to int partition = Integer.parseInt(m.group(1).substring(idx+1)); to avoid negative numberHaney
P
0

You can set up a spring.deserializer.key.function or spring.deserializer.value.function property in your KafkaListener that allows you to try to recover a message that faild deserialization and make it return an alternative copy.

@KafkaListener(
  topicPattern = "foo",
  properties = {
    "spring.deserializer.key.function=my.DeserializationErrorFunction"
  }
)

The functin would be defined as follows

public class DeserializationErrorFunction
    implements Function<FailedDeserializationInfo, MyObject> {

  @Override
  public PortalKafkaKey apply(FailedDeserializationInfo failedDeserializationInfo) {

    if (failedDeserializationInfo.getException() instanceof SerializationException) {
      log.warn(
          "SerializationException received handling message, likely it's old data format.");
      return MyObject.INVALID;
    } else {
      return null;
    }
  }
}

Then the @KafkaListener annotated method could be implemented as

public void processMessage(@Payload MyObject myObject) {
  if (myObject == MyObject.INVALID) {
    log.error("got invalid data, just skipping")
    return;
  }
  ... the rest of the processing
}
Pervade answered 12/4 at 4:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.