How to get the latest offset from the Kafka topic in Confluent kafka C# library?
Asked Answered
P

4

7

I am using Confluent kafka C# client. how to get the latest offset consumed from a topic in this?

Pectoralis answered 8/7, 2017 at 7:1 Comment(0)
O
7

In addition to previous answer, you can use

List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions)

It will return the last offset polled from librdkafka for given topic/partitions

You have a similar Committed method, for the latest committed offset from consumer


Also you can query the latest known offsets

WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)

it will send a request to kafka cluster. The call is blocking, set a proper timeout. Currently, you cannot send a request on multiple partition at once. You can use it either to get last known offset, either to compute lag

There is also

WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)

which will query the internal state in librdkafka, and could return INVALID_OFFSET (-1001). You can use it to detect some lag due to processing the data. (difference between position and result of this method)

Ousel answered 10/7, 2017 at 10:58 Comment(0)
G
1

When you receive a message it should include the topic, partition, and offset from where it came (in addition to the Key and Value).

From the example here:

consumer.OnMessage += (_, msg)
  => Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " +
        $"Offset: {msg.Offset} {msg.Value}");

You also get an event when it reaches the end of the each topic partition

consumer.OnPartitionEOF += (_, end)
  => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" +
          $" , next message will be at offset {end.Offset}");
Gardol answered 10/7, 2017 at 1:48 Comment(0)
A
1

Instead of retrieving offset information from consumer (I didn't want to consume message first) I was able to read topic offsets (high and low) from producer like this:

var partitionOffset = _producer.QueryWatermarkOffsets(new TopicPartition("myTopic", myPartition), TimeSpan.FromSeconds(10));
Annihilator answered 22/2, 2018 at 16:23 Comment(0)
N
0

You can use the Position() method from the consumer like:

var partition = 0;
Offset lastOffset = _kafkaConsumer.Position(new TopicPartition("topic", partition));
Northrop answered 25/7, 2022 at 10:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.