How does kafka consumer auto commit work?
Asked Answered
B

4

31

I am reading this one:

Automatic Commit The easiest way to commit offsets is to allow the consumer to do it for you. If you configure enable.auto.commit=true, then every five seconds the consumer will commit the largest offset your client received from poll(). The five-second interval is the default and is controlled by setting auto.commit.interval.ms. Just like everything else in the consumer, the automatic commits are driven by the poll loop. Whenever you poll, the consumer checks if it is time to commit, and if it is, it will commit the offsets it returned in the last poll.

Maybe issue that my English is not good but I do not fully understood this description.

Let's say I use auto-commit with default interval - 5 sec and poll happens every 7 sec. At this case, a commit will happen every 5 sec or every 7 sec?

Can you clarify behaviour if poll will happen every 3 sec? Will commit happen every 5 sec or every 6 sec?
I have read this one:

Auto commits: You can set auto.commit to true and set the auto.commit.interval.ms property with a value in milliseconds. Once you've enabled this, the Kafka consumer will commit the offset of the last message received in response to its poll() call. The poll() call is issued in the background at the set auto.commit.interval.ms.

And it contradict the answer.

Can you explain this stuff in details.

let say I have diagramm like this:

0 sec - poll
4 sec - poll
8 sec - poll

When does offset will be committed and when which one?

Berrios answered 3/10, 2017 at 14:1 Comment(0)
B
27

The auto-commit check is called in every poll and it checks that the time elapsed is greater than the configured time. If so, the offset is committed.

In case the commit interval is 5 seconds and poll is happening in 7 seconds, the commit will happen after 7 seconds only.

Brubaker answered 3/10, 2017 at 14:35 Comment(4)
and second case?Berrios
second case will follow the same logic , for the first poll it wont commit as 3 < 5 , but on the next poll it will commit as 6 > 5 and after commit it will reset the counter and same pattern will followBrubaker
But how to commit last poll at this case? Should I do it manually?Berrios
when you close the consumer and auto commit is enabled , it will commit the offset before closing consumerBrubaker
V
16

It would try to autocommit ASAP after poll completes. You can have a look on the source code of consumer coordinator, which has set of local fields defined on class level to understand whether autocommit is enabled, what is the interval, and what is the next deadline to perform autocommit.

https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L625

And one of the places within poll that perform a call to do storage https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L279

That being said for instance poll executed every 7 seconds, and autocommit set to 5:

0 - poll, + set deadline to 5th second

7 - poll + commit due to deadline, update deadline to 7+5=12

14 - poll + commit due to deadline, update deadline to 12+5=17

However if polling set to every 3 seconds, and autocommit is set to 5:

0 - poll, + set deadline to 5th second

3 - poll, no commit

6 - poll + commit due to deadline, update deadline to 6+5=11

Vortex answered 18/10, 2017 at 22:45 Comment(4)
First deadline happens only on 6th second?Berrios
@Berrios yes, according to the source code of kafka clients. I know it would sound stupid, but in general and from different point what would you like to achieve? In mission critical systems it's better to perform a commit an offset manually once you are pretty sure that message processed, in other cases - you might prefer favoring less frequent offset commit for speeding up the process. However offset commit is not trivial as it would involve zookeeper, etc.Vortex
@Vortex , does it mean , it doesn't necessarily commit the offset every interval, configured through "auto.commit.interval.ms"? we also need to take the elapsed time as well ?Anselma
@Anselma autocommit respects interval and time it took to process data between two consecutive calls to poll. But when it re-initiates the deadline ``` public void maybeAutoCommitOffsetsAsync(long now) { ... t...CommitDeadline = now + autoCommitIntervalMs; ... } ``` as you can see it picks now, which is coming from the beginning of poll request.Vortex
H
4

Here a simple code to test how it works.

doc -> https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html

public class KafkaTest {
    
    public static final String KAFKA_TOPIC_NAME = "kafka-xx-test-topic";
    public static final String CONSUMER_GROUP_ID = "test-consumer-xx";
    public static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        final KafkaProducer<Object, Object> kafkaProducer = new KafkaProducer<>(getProps());
        for (int i = 0; i < 1000; i++) {
            kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC_NAME, "Data_" + i));
        }
        final Consumer<Long, String> consumer = new KafkaConsumer<>(getProps());
        consumer.subscribe(Collections.singletonList(KAFKA_TOPIC_NAME));
        TopicPartition actualTopicPartition = new TopicPartition(KAFKA_TOPIC_NAME, 0);
        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(60));
            consumerRecords.forEach(record -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                }
            });
            final long committedOffset = consumer.committed(Collections.singleton(actualTopicPartition)).get(actualTopicPartition).offset();
            final long consumerCurrentOffset = consumer.position(actualTopicPartition);
            System.out.println("Poll finish.. consumer-offset: " + consumerCurrentOffset + " - committed-offset: " + committedOffset + " " + LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
        }
    }

    private static Map<String, Object> getProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //  Default: latest
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Default: true
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // Default: 500
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Default: 5000
        return props;
    }
}
  • Polls every 2 sec
  • Auto commit every 5 sec

output like below

Poll finish.. consumer-offset: 1010 - committed-offset: 1000 17:07:05
Poll finish.. consumer-offset: 1020 - committed-offset: 1000 17:07:07
Poll finish.. consumer-offset: 1030 - committed-offset: 1000 17:07:09
Poll finish.. consumer-offset: 1040 - committed-offset: 1030 17:07:11 -> commit when poll finish because of elapsed time(6 sec) > commit interval(5 sec)
Poll finish.. consumer-offset: 1050 - committed-offset: 1030 17:07:13
Poll finish.. consumer-offset: 1060 - committed-offset: 1030 17:07:15
Poll finish.. consumer-offset: 1070 - committed-offset: 1060 17:07:17 -> auto commit 
Poll finish.. consumer-offset: 1080 - committed-offset: 1060 17:07:19
Poll finish.. consumer-offset: 1090 - committed-offset: 1060 17:07:21
Poll finish.. consumer-offset: 1100 - committed-offset: 1090 17:07:23 -> auto commit 
Hiltner answered 9/8, 2021 at 14:25 Comment(3)
How come the consumer committing the offsets like 1020,1030 where the producer just produced 1000 messages?[From the code] Does this topic had more than 1000 messages ?Anselma
@Anselma i just started it second time :) topic has old messages, its not importantHiltner
makes sense :-), yes , it is NOT important but asked this question to see if i miss somethingAnselma
D
0

Have a look at the below configurations which gives another perspective to Kafka consumer tuning: For 30 records from producer, if the consumer crashed before 20 seconds then the entire set of 30 records are read again by the consumer since both max-poll-interval and auto-commit-interval are set to 20 seconds

 auto-commit-interval: 20000
      auto-offset-reset: latest
      max-poll-records: 10
      max-poll-interval-ms: 20000

But for below configuration where an auto-commit happens every 2 seconds and were the consumer to crash at any point of time > 2 seconds then those records which have been committed to Kafka producer won't be picked up by consumer again.

 auto-commit-interval: 2000
      auto-offset-reset: latest
      max-poll-records: 10
      max-poll-interval-ms: 20000

Further, auto-commit-interval always takes a precedence over max-poll-interval. Were the auto-commit not to happen for some strange reason then after the elapse of the max-poll-interval of 20 seconds the Kafka broker would conclude that the consumer has gone down.

Debacle answered 9/8, 2021 at 11:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.