How to get latest offset for a partition for a kafka topic?
Asked Answered



I am using the Python high level consumer for Kafka and want to know the latest offsets for each partition of a topic. However I cannot get it to work.

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()

But the output I get is

For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None

I have an alternate approach using assign but the result is the same

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()

It seems from some of the documentation that I might get this behaviour if a fetch has not been issued. But I cannot find a way to force that. What am I doing wrong?

Or is there a different/simpler way to get the latest offsets for a topic?

Lovett answered 16/2, 2016 at 12:13 Comment(1)
Not 100% positive, but I think your code is returning the value of highwater before kafka-python has actually connected to the broker. Since KafkaConsumer is async, I think you have to actually consume a message for the highwater value to be populated:

Finally after spending a day on this and several false starts, I was able to find a solution and get it working. Posting it her so that others may refer to it.

from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload

client = SimpleClient(brokers)

partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]

offsets_responses = client.send_offset_request(offset_requests)

for r in offsets_responses:
    print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
Lovett answered 16/2, 2016 at 16:53 Comment(3)
Is there a way to get the current/next offset per consumer/group per partition?Garniture
Sadly, the SimpleClient has been deprecated, and the offsets_responses above yields a FailedPayloadsError: FailedPayloadsErrorIsiah
@Isiah it worked for me, but Itamar Lavender's answer using the non-deprecated parts below works too. If you don't have a group yet, skip the "lag" part and that works as well.Hammett

If you wish to use Kafka shell scripts present in kafka/bin, then you can get latest and smallest offsets by using

To get latest offset command will look like this

bin/ --broker-list localhost:9092 --time -1 --topic topiname

To get smallest offset command will look like this

bin/ --broker-list localhost:9092 --time -2 --topic topiname

You can find more information on Get Offsets Shell from following link

Hope this helps!

Lasonde answered 16/2, 2016 at 20:32 Comment(2)
the simplest solution 👍🏼Yuki
It does not seem to work with SSLOvule
from kafka import KafkaConsumer, TopicPartition

BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092']

consumer = KafkaConsumer(

for p in consumer.partitions_for_topic(TOPIC):
    tp = TopicPartition(TOPIC, p)
    committed = consumer.committed(tp)
    last_offset = consumer.position(tp)
    print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed)))

Curtain answered 21/6, 2017 at 20:22 Comment(1)
As I see this question still drags attention I wanted to explain while my answer above doesn't really answer the question as to my opinion the last offset of a topic/partition is only relevant in a context of a consumer group. kafka is built for many consumer groups consuming same data from same topics, all I find important is the rate of consumption from a group or more important the lag.Curtain

With kafka-python>=1.3.4 you can use:


Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

Introduction answered 11/12, 2018 at 15:0 Comment(0)

Another way to achieve this is by polling the consumer to obtain the last consumed offset and then using the seek_to_end method to obtain the most recent available offset partition.

from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',

This method particularly comes in handy when using consumer groups.


Forecastle answered 3/8, 2017 at 16:20 Comment(3)
My server has hundreds of messages, yet consumer.poll() returned {}Spellbind
This could happen if you are running more consumer instances than there are partitions for that topic.Forecastle
Good point. I was able to after the fact determine we weren't calling .close, so that very circumstance occurred, but we thought there was only 1.Spellbind

Using confluent-kafka-python

You can use position:

Retrieve current positions (offsets) for the list of partitions.

from confluent_kafka import Consumer, TopicPartition

consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())] 

offset_per_partition = consumer.position(partitions)

Alternatively, you can also use get_watermark_offsets but you'd have to pass one partition at a time and thus it requires multiple calls:

Retrieve low and high offsets for partition.

from confluent_kafka import Consumer, TopicPartition

consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())] 

for p in partitions:
    low_offset, high_offset = consumer.get_watermark_offsets(p)
    print(f"Latest offset for partition {p}: {high_offset}")

Using kafka-python

You can use end_offsets:

Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

This method does not change the current consumer position of the partitions.

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

consumer = KafkaConsumer(bootstrap_servers = "localhost:9092" )
partitions= = [TopicPartition('myTopic', p) for p in consumer.partitions_for_topic('myTopic')]
last_offset_per_partition = consumer.end_offsets(partitions)
Elfish answered 31/5, 2020 at 12:20 Comment(4)
Exception has occurred: TypeError expected cimpl.TopicPartitionHydrocellulose
Which of the two libraries?Elfish
@Hydrocellulose Which line is causing the issue?Elfish

kafka-consumer-groups --bootstrap-server host1:9093,crow-host2:9093,host3:9093 --command-config=/root/ --describe --group atlas

This command will show the status. Lag/offset

Chere answered 5/5, 2022 at 11:26 Comment(0)

Using kafka-python

While defining the consumer, argument auto_offset_reset can be set either to 'earliest' or 'latest'. This is useful incase consumer starts after the retention period and/or restarts after breaking down, messages will be consumed as per auto.offset.reset configuration

from kafka import KafkaConsumer
consumer = KafkaConsumer(
     value_deserializer=lambda x: loads(x.decode('utf-8')))

see this example.

Ruching answered 31/12, 2022 at 8:14 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.