Producer properties
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Consumer properties
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085
Consumer Service
@Service
public class UserConsumerService {
@KafkaListener(topics = { "user-topic" })
public void consumerUserData(User user) {
System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());
}
}
Producer Service
@Service
public class UserProducerService {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
public void sendUserData(User user) {
kafkaTemplate.send("user-topic", user.getName(), user);
}
}
Producer Config for creating topic
@Configuration public class KafkaConfig {
@Bean
public NewTopic topicOrder() {
return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
}
}
Producer works well but Consumer gives error like
2021-12-06 21:45:50.299 ERROR 4936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an
'ErrorHandlingDeserializer' in the value and/or key deserializer
at
org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149)
~[spring-kafka-2.8.0.jar:2.8.0] DefaultErrorHandler.java:149
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1760)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1760
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1283)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1283
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
~[na:na] Executors.java:539
at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
~[na:na] FutureTask.java:264
at
java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Thread.java:833 Caused by:
org.apache.kafka.common.errors.RecordDeserializationException: Error
deserializing key/value for partition user-topic-0 at offset 1. If
needed, please seek past the record to continue consumption.
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1429
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:134
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1652
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1488
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:721
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:672
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1277
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1238
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1211
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1507)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1507
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1497)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1497
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1325)
~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessage
I am new to Kafka and trying to figure out why I'm getting this error. How do I fix this?
consumer
part and be sure that you useDeserializer
there. – Heliozoancause
of such aSerializationException
... – Heliozoan