Acknowledgement.acknowledge() throwing exception in spring-kafka @KafkaListener
Asked Answered
D

2

14

When I set enable.auto.commit to false and try to manually commit offset using annotation based spring-kafka @KafkaListener, I get a org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message

I have a very simple code as follows:

@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
    System.out.println("Received Messasge in group 'foo': " + message);

    // TODO: Do something with the message
}

And when I send a message from the producer, I get the following exception:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message.

Endpoint handler details:

Method [public void com.****.*****.*******.KafkaMessageListener.listenFooGroup(java.lang.String,org.springframework.kafka.support.Acknowledgment)]

Bean [com.****.*****.*******.KafkaMessageListener@5856dbe4]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}], failedMessage=GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}]

Please help. TIA.

Dismissive answered 27/6, 2017 at 19:17 Comment(0)
B
34

You have to the set the container factory's containerProperties ackMode to MANUAL or MANUAL_IMMEDIATE to get an Acknowledgment object.

With other ack modes, the container is responsible for committing the offset.

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)

Or set the ....ackMode property if using Spring Boot

Bigamous answered 27/6, 2017 at 19:49 Comment(6)
2.0 will throw a less obscure exception - thanks for pointing this out.Bigamous
It does not throw an exception now. Thanks a lot. Appreciate your help.Dismissive
The next release will throw a more meaningful exception. new IllegalStateException("No Acknowledgment availailable as an argument, the listener container must have a MANUAL Ackmode to populate the Acknowledgment.",. Thanks for pointing this out.Bigamous
Can somebody please tell me where to set the abovementioned ackMode property for Spring Boot.Battement
Don't ask new questions in comments on very old answers. spring.kafka.listener.ack-mode - docs.spring.io/spring-boot/docs/2.3.2.RELEASE/reference/html/…Bigamous
@Battement factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);Sylviesylvite
R
1

Solved this issue in spring boot 2.4.3 by applying the following to application.properties

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

Set ack-mode to manual or manual_immediate, based on your use case.

Reticulation answered 19/10, 2023 at 20:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.