How to use Spring Kafka's Acknowledgement.acknowledge() method for manual commit
Asked Answered
B

4

20

I am using Spring Kafka first time and I am not able to use Acknowledgement.acknowledge() method for manual commit in my consumer code as mentioned here https://docs.spring.io/spring-kafka/reference/html/_reference.html#committing-offsets. Mine is spring-boot application. If I am not using manual commit process than my code is working fine. But when I use Acknowledgement.acknowledge() for manual commit it shows error related to bean. Also If I am not using manual commit properly please suggest me the right way to do it.

Error message:

***************************
APPLICATION FAILED TO START
***************************

Description:

Field ack in Receiver required a bean of type 'org.springframework.kafka.support.Acknowledgment' that could not be found.


Action:

Consider defining a bean of type 'org.springframework.kafka.support.Acknowledgment' in your configuration.

I googled this error I found that I need to add @Component but that is already there in my consumer code.

My consumer code looks like this: Receiver.java

import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

    @Autowired
    public Acknowledgment ack;

    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "${kafka.topic.TestTopic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord){
            System.out.println(consumerRecord.value());
            latch.countDown();
            ack.acknowledge();
    }
}

My producer code looks like this: Sender.java

import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private KafkaTemplate<String, Map<String, Object>> kafkaTemplate;

    public void send(Map<String, Object> map){
            kafkaTemplate.send("TestTopic", map);

    }

}

EDIT 1:

My new consumer code looks like this: Receiver.java

import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "${kafka.topic.TestTopic}", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack){
            System.out.println(consumerRecord.value());
            latch.countDown();
            ack.acknowledge();
    }
}

I changed my configuration class also:

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() throws SendGridException {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
            return props;

    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /*@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }*/

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

After adding containerFactory = "kafkaManualAckListenerContainerFactory" to my receive() method I am getting the below error.

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory'


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
Butte answered 20/9, 2017 at 15:1 Comment(0)
I
25

For those still looking for a solution to these errors concerning manual acknowledgment, you don't need to specify containerFactory = "kafkaManualAckListenerContainerFactory", instead you can just add:

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

to your receiver config just before you return the factory object.

Then you also need:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

in consumer config props.

So in the end your listener method can simply look like:

@KafkaListener(topics = "${spring.kafka.topic}")
    private void listen(@Payload String payload, Acknowledgment acknowledgment) {
        //Whatever code you want to do with the payload
        acknowledgement.acknowledge(); //or even pass the acknowledgment to a different method and acknowledge even later
    }
Interlace answered 30/1, 2019 at 9:55 Comment(7)
Does anyone have a minimal working example of an entire working app using this pattern?Supat
Sure thing, I created this simple project for you, take a look at my git page: github.com/zim8662/kafka-example Just run the zookeeper and kafka server locally, or if it's remote change the property in application.properties (along with group and topic properties of course) and try localhost:8080/test in your browser.Interlace
Hi Zim - wondering what happens if you then set props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)? Will kafka still autocommit? and will Spring try to commit again?Huskey
@Huskey I think this is a simple yet good explanation on this property: medium.com/@danieljameskay/…Interlace
I think your answer is not good, since the change will affect all Kafka listeners bounded to the ListenerContainerFactory.Gainless
When we are dealing acknowledgement manually, we need to set the ContainerProperties.AckMode.MANUAL_IMMEDIATE and map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false) . Otherwise we will end up the below exception. org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[];Antidepressant
auto commit is disabled by default now (spring kafka listeners). so its really just setting that ackMode. What im trying to figure out is the Acknowledgment piece because it seems NOT needed. it seems you can throw kafka exceptions to control behavior of offset commit.Bueschel
M
17

You really should follow documentation:

When using manual AckMode, the listener can also be provided with the Acknowledgment; this example also shows how to use a different container factory.

@KafkaListener(id = "baz", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

There is really nowhere noted that Acknowledgment is a bean. So, change your receive() @KafkaListener method signature appropriately and remove that @Autowired for suspicious Acknowledgment bean - it just doesn't exists because this object is a part (header) of each received message.

Megganmeggi answered 20/9, 2017 at 15:17 Comment(4)
Hi @Artem , I did the changes but now getting another bean error. I update my question with new Edit. The spring kafka documentation is not that much clear.Butte
1. You still don't have Acknowledgment arg in your receive() method. So, I'm not sure what is that ack var for. 2. You have some mix of raw Spring Kafka configuration and Spring Boot. You should consider to start from there: docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/… and just reconfigure out-of-the-box kafkaListenerContainerFactory bean via proper ConsumerFactory bean. In your case it is ConsumerFactory<String, String>, but has to be ConsumerFactory<?, ?>Megganmeggi
Saying "you should follow the docs" is unproductive; the docs here are wildly confusing and not beginner friendly. A link to a minimal working application would be nice.Supat
@ArtemBilan The documentation link is broken.Goines
P
9

Those who are using spring boot application, simple add below to your application.yml (or environment specific file).

spring:
  kafka:
    listener:
      ack-mode: manual

Above change will make Acknowledgment ack argument available inside
receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) method automatically.

Picklock answered 9/11, 2021 at 14:37 Comment(0)
E
0

Java code:

@Service
public class ServiceCallKafkaListener {

    @KafkaListener(id = "listenerId",
            groupId = "groupListenerId",
            topics = "topicName")
    public void listenServiceCall(@Payload String message,
                                  Acknowledgment acknowledgment) {
        //here is your logic for message processing
        boolean logicForMessageProcessingCompleted = true;
        if (logicForMessageProcessingCompleted) {
            //manual commit
            acknowledgment.acknowledge();
        }
    }
}

application.properties

#type of acknowledgment mode
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

#property that turns off auto commit
spring.kafka.consumer.enable-auto-commit=false
Entresol answered 27/7, 2023 at 18:15 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.