OK. I cannot run your application because it is a part of much bigger project, so it is hard for me to isolate it. However I see the problem. You do this:
DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
propsMap,
new StringDeserializer(),
new AppJsonDeserializer<Employee>()
);
Therefore all those ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
config props are ignored.
Since you do not have any error handling in your AppJsonDeserializer
:
try {
return objectMapper.readValue(data, className);
} catch (Exception e) {
LOGGER.error("## Exception : {}", e.getMessage());
throw new SerializationException(e);
}
it is not a surprise that you got an endless loop for the same "junk" record on its deserialization.
UPDATE
I still was not able to run your application properly: too many docker containers, which are most not related to the subject.
I think the solution must be like this:
private void initializeConsumer(String topic) {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfig.CLIENT_ID);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVER);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
propsMap.put(AppJsonDeserializer.VALUE_CLASS_NAME_CONFIG, Employee.class);
DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
propsMap,
new StringDeserializer(),
new ErrorHandlingDeserializer<>(new AppJsonDeserializer<>())
);
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(new MyMessageListener());
ConcurrentMessageListenerContainer<String, Employee> container =
new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);
container.start();
}
As I said: those props for deserializer are ignored when you provide an instance into DefaultKafkaConsumerFactory
ctor.
Therefore that your AppJsonDeserializer
has to be wrapped into an ErrorHandlingDeserializer
.
It is still wrong to do container.start()
from the bean constructor. Plus it is better to have a DefaultKafkaConsumerFactory
and ConcurrentMessageListenerContainer
as beans. This way their configuration and lifecycle are going to be managed properly by Spring.
kafka-console-producer --broker-list localhost:9092 --topic t-employee >{"empId":66,"firstName":"Jessie","lastName":"Wiegand","gender":"M"
– Tigon