How to handle the exception at consumer ion Spring?
Asked Answered
T

1

0

How to perform error handling in Spring XML + Kafka App? I am using JSON to produce and consume messages, but when consumer gets junk data, its running endless loop.

Here is what I used

spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <bean id="employeeProducer" class="com.example.kafka.producer.EmployeeProducer" />

    <bean id="myListener" class="com.example.kafka.listener.MyMessageListener" />

    <bean id="employeeConsumer" class="com.example.kafka.consumer.EmployeeKafkaConsumer2" >
        <constructor-arg name="topic" value="t-employee" />
    </bean>
</beans>
Tigon answered 18/4, 2024 at 14:14 Comment(4)
Would be great if you share what the error is there. Also some simple project to let us reproduce on our side would be very helpful.Laocoon
@ArtemBilan - have uploaded code here: github.com/javaHelper/spring-boot-advance-demos/tree/main/…Tigon
Can you isolate that, please, into an independent project and provide some reproducer. Not sure what is "junk" in your opinion...Laocoon
Keep App up and send data from consumer manually. kafka-console-producer --broker-list localhost:9092 --topic t-employee >{"empId":66,"firstName":"Jessie","lastName":"Wiegand","gender":"M" Tigon
L
2

OK. I cannot run your application because it is a part of much bigger project, so it is hard for me to isolate it. However I see the problem. You do this:

    DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
            propsMap,
            new StringDeserializer(),
            new AppJsonDeserializer<Employee>()
    );

Therefore all those ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG config props are ignored.

Since you do not have any error handling in your AppJsonDeserializer:

    try {
        return objectMapper.readValue(data, className);
    } catch (Exception e) {
        LOGGER.error("## Exception : {}", e.getMessage());
        throw new SerializationException(e);
    }

it is not a surprise that you got an endless loop for the same "junk" record on its deserialization.

UPDATE

I still was not able to run your application properly: too many docker containers, which are most not related to the subject.

I think the solution must be like this:

private void initializeConsumer(String topic) {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfig.CLIENT_ID);
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVER);

    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    propsMap.put(AppJsonDeserializer.VALUE_CLASS_NAME_CONFIG, Employee.class);

    DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
            propsMap,
            new StringDeserializer(),
            new ErrorHandlingDeserializer<>(new AppJsonDeserializer<>())
    );

    ContainerProperties containerProperties = new ContainerProperties(topic);
    containerProperties.setMessageListener(new MyMessageListener());

    ConcurrentMessageListenerContainer<String, Employee> container =
            new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);

    container.start();
 }

As I said: those props for deserializer are ignored when you provide an instance into DefaultKafkaConsumerFactory ctor. Therefore that your AppJsonDeserializer has to be wrapped into an ErrorHandlingDeserializer.

It is still wrong to do container.start() from the bean constructor. Plus it is better to have a DefaultKafkaConsumerFactory and ConcurrentMessageListenerContainer as beans. This way their configuration and lifecycle are going to be managed properly by Spring.

Laocoon answered 18/4, 2024 at 15:1 Comment(6)
If you use an explicit Deserializer for the factory, then you need to look into using an ErrorHandlingDeserializer. All those props with for Consumer config are ignored in case of new AppJsonDeserializer<Employee>(). Consider to wrap it into an ErrorHandlingDeserializer if you are not going to rely on props.Laocoon
To be honest, I tried multiple ways, if you have sometime and update the solution would be nice. I also see no documents DefaultKafkaConsumerFactory and exception handlingTigon
This is not a DefaultKafkaConsumerFactory responsibility. See respective docs: docs.spring.io/spring-kafka/reference/kafka/…Laocoon
See an UPDATE in my answer.Laocoon
Great Thanks. Working. I really like the folks like you who provide end-to-end working solution and save the many hours of struglling, kudos to you!Tigon
Ok, since my project uses the Spring XML and no Boot. I will create the xml beans and will use from thereTigon

© 2022 - 2025 — McMap. All rights reserved.