Kafka Listener method could not be invoked with the incoming message
Asked Answered
H

2

5

I am sending an array of JSON by converting it to toString() in Kafka Producer using Spring Boot app, but I am getting following error in Consumer:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.springboot.service.KafkaReciever.recieveData(com.springboot.model.Student,java.lang.String) throws java.lang.Exception] Bean [com.springboot.service.KafkaReciever@5bb3d42d]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.springboot.model.Student] for GenericMessage [payload=[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8], headers={kafka_offset=45, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=myTopic-kafkasender}], failedMessage=GenericMessage [payload=[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8], headers={kafka_offset=45, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=myTopic-kafkasender}]

Configuration File:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.boot.server}")
    private String kafkaServer;

    @Value("${kafka.consumer.group.id}")
    private String kafkaGroupId;

    @Bean
    public ConsumerFactory<String, String> consumerConfig() {

         Properties props = new Properties();

         props.put("bootstrap.servers", "localhost:9092");
         props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
         props.put("message.assembler.buffer.capacity", 33554432);
         props.put("max.tracked.messages.per.partition", 24);
         props.put("exception.on.message.dropped", true);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put("segment.deserializer.class", DefaultSegmentDeserializer.class.getName());

         return new DefaultKafkaConsumerFactory(props, null, new StringDeserializer());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> listener = new ConcurrentKafkaListenerContainerFactory<>();
        listener.setConsumerFactory(consumerConfig());
        return listener;
    }
}

Reciever File:

@Service
public class KafkaReciever {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReciever.class);

    @KafkaListener(topics = "${kafka.topic.name}", group = "${kafka.consumer.group.id}")
    public void recieveData(@Payload Student student, @Header(KafkaHeaders.MESSAGE_KEY) String messageKey) throws Exception{
        LOGGER.info("Data - " + student + " recieved");
    }
}

POST json:

 [{
        "studentId": "Q45678123",
        "firstName": "Anderson",
        "lastName": "John",
        "age": "12",
        "address": {
          "apartment": "apt 123",
          "street": "street Info",
          "state": "state",
          "city": "city",
          "postCode": "12345"
        }
    },
    {
        "studentId": "Q45678123",
        "firstName": "abc",
        "lastName": "xyz",
        "age": "12",
        "address": {
          "apartment": "apt 123",
          "street": "street Info",
          "state": "state",
          "city": "city",
          "postCode": "12345"
        }
    }]

I am getting the following consumer output:

[com.springboot.model.Student@5e40dc31, com.springboot.model.Student@235e68b8]
Headmaster answered 16/9, 2019 at 8:59 Comment(6)
Your consumer waits for String (deserializer), but it's receiving a Student. Do you have access to the producer's serializer? You need something like this: #40154586Saveall
my Model data is: public class Student implements Serializable { private static final long serialVersionUID = 1L; private String studentId; private String firstName; private String lastName; private String age; private Address address; get/set }Headmaster
I changed VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer then i got following error: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition myTopic-kafkasender-0 at offset 47 Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of com.springboot.model.Student out of START_ARRAY token at [Source: [B@304eaefb; line: 1, column: 1]Headmaster
ConsumerFactory<String, String> please try to user your payload object type like if you class name student then ConsumerFactory<String, Student> ____________________ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); You can user json desrialzer if you need proper ans ping me i will help youSpore
@harkesh thanks.. adding this getting same error.. but changing in Service as '@Payload List<Student> student ' error solved but data not converted to array of json.Headmaster
@VaibhavShelar if you able to fix it then make a sample project and share it i will update your code and fix issueSpore
G
6

Can not deserialize instance of com.springboot.model.Student out of START_ARRAY

If using a json deserailizer, you have a list, not a single Student

@Payload List<Student> student

Or if using the string deserailizer, you have a String of JSON and you must parse it manually

receiveData(@Payload String student ... ) { 
    JsonNode data = new ObjectMapper().readTree(student); // for example, but should extract ObjectMapper to a field
}

Regarding your other output, please see How do I print my Java object without getting "SomeType@2f92e0f4"?

Georgiannageorgianne answered 16/9, 2019 at 12:55 Comment(2)
Thanks. I have parsed JSON data in producer and have send it to consumer, after that i am getting proper json data in consumer. just wanted to check, if I am doing this in a correct way or not?Headmaster
As long as there are no exceptions, I would say nothing is wrongGeorgiannageorgianne
M
0

Not about you, but mind your beans' names. I almost went crazy with that. By default, Spring Kafka expects the ConcurrentKafkaListenerContainerFactory bean to be named kafkaListenerContainerFactory. If the method name matches this, it is automatically used by the framework. This is a convention that allows Spring to wire up the Kafka listener infrastructure without needing to explicitly reference this bean in the configuration or listener setup.

Metallo answered 7/11 at 13:57 Comment(2)
This does not provide an answer to the question. Once you have sufficient reputation you will be able to comment on any post; instead, provide answers that don't require clarification from the asker. - From ReviewRosana
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Dylan

© 2022 - 2024 — McMap. All rights reserved.