Kafka - How to commit offset after every message using High-Level consumer?
Asked Answered
A

3

28

I'm using Kafka's high-level consumer. Because I'm using Kafka as a 'queue of transactions' for my application, I need to make absolutely sure I don't miss or re-read any messages. I have 2 questions regarding this:

  1. How do I commit the offset to zookeeper? I will turn off auto-commit and commit offset after every message successfully consumed. I can't seem to find actual code examples of how to do this using high-level consumer. Can anyone help me with this?

  2. On the other hand, I've heard committing to zookeeper might be slow, so another way may be to locally keep track of the offsets? Is this alternative method advisable? If yes, how would you approach it?

Assist answered 13/8, 2014 at 18:34 Comment(0)
O
21

There are two relevant settings from http://kafka.apache.org/documentation.html#consumerconfigs.

auto.commit.enable

and

auto.commit.interval.ms

If you want to set it such that the consumer commits the offset after each message, that will be difficult since the only setting is after a timer interval, not after each message. You will have to do some rate prediction of the incoming messages and accordingly set the time.

In general, it is not recommended to keep this interval too small because it vastly increases the read/write rates in zookeeper and zookeeper gets slowed down because it's strongly consistent across its quorum.

Odelle answered 20/8, 2014 at 1:10 Comment(2)
what do you think about using the commitOffsets() method?Assist
Good point! I think that could also work, but the issue is that the consumer connector could commit the offsets of many different streams at the same time and you can't really control which ones to commit. If you want to commit all at the same time, then this does indeed work. Thanks! I didn't know about this. Of course, this only exacerbates the issue of extremely frequent writes to zookeeper.Odelle
C
35

You could first disable auto commit: auto.commit.enable=false

Then commit after fetching the message: consumer.commitOffsets(true)

Concatenate answered 29/4, 2015 at 18:51 Comment(0)
O
21

There are two relevant settings from http://kafka.apache.org/documentation.html#consumerconfigs.

auto.commit.enable

and

auto.commit.interval.ms

If you want to set it such that the consumer commits the offset after each message, that will be difficult since the only setting is after a timer interval, not after each message. You will have to do some rate prediction of the incoming messages and accordingly set the time.

In general, it is not recommended to keep this interval too small because it vastly increases the read/write rates in zookeeper and zookeeper gets slowed down because it's strongly consistent across its quorum.

Odelle answered 20/8, 2014 at 1:10 Comment(2)
what do you think about using the commitOffsets() method?Assist
Good point! I think that could also work, but the issue is that the consumer connector could commit the offsets of many different streams at the same time and you can't really control which ones to commit. If you want to commit all at the same time, then this does indeed work. Thanks! I didn't know about this. Of course, this only exacerbates the issue of extremely frequent writes to zookeeper.Odelle
Y
0

I've solved my problem by using:

consumerConfig.EnableAutoCommit = false;

after

var consumer = consumerBuilder.Consume(cancelToken.Token);

using

consumerBuilder.Commit(consumer);

I'm using: Confluent.Kafka for my C# client

Yezd answered 27/7, 2022 at 19:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.