Kafka consumer manual offset commit
Asked Answered
S

1

6

In one of my use-case consist of consuming the data, do some operations and produce it to new topic.

I'm using https://www.npmjs.com/package/kafkajs npm library.

I would like to commit the offset manually after successful operations to avoid any data loss. I'm using autoCommit: false to avoid data auto committing after consuming.

This is the code to commit offset manually

consumer.commitOffsets([
  { topic: 'topic-A', partition: 0, offset: '1' }
])

As I read somewhere that if we commit each offset intentionally (commit offset immediately after cosuming) then it will create load on brokers and its not good to do.

I need kafka expert advise to suggest the best approach on my above use case to avoid any data loss? please advise

Schlosser answered 29/8, 2021 at 15:31 Comment(1)
I found some work-around from github.com/tulios/kafkajs/issues/540Connection
W
4

in order to handle commit manually below is the code.

await consumer.run({
  autoCommit: false,
  eachMessage: async ({ topic, partition, message }) => {
    ...
    await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
  },
});
Wainscoting answered 2/9, 2022 at 5:22 Comment(1)
"Number(message.offset) + 1" this increment was very useful for me! Thanks, @Karunakaran!Gatekeeper

© 2022 - 2025 — McMap. All rights reserved.