Get Latest Message for a Confluent Kafka Topic in Python
Asked Answered
G

2

6

Here's what I've tried so far:

from confluent_kafka import Consumer

c = Consumer({... several security/server settings skipped...
              'auto.offset.reset': 'beginning',
              'group.id': 'my-group'})

c.subscribe(['my.topic'])
msg = poll(30.0)  # msg is of None type.

msg almost always ends up being None though. I think the issue might be that 'my-group' has already consumed all the messages for 'my.topic'... but I don't care whether a message has already been consumed or not - I still need the latest message. Specifically, I need the timestamp from that latest message.

I tried a bit more, and from this it looks like there are probably 25 messages in the topic, but I have no idea how to get at them:

a = c.assignment()
print(a)  # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets)  # Outputs: (25, 25)

If there are no messages because the topic has never had anything written to it at all, how can I determine that? And if that's the case, how can I determine how long the topic has existed for? I'm looking to write a script that automatically deletes any topics that haven't been written to in the past X days (14 initially - will probably tweak it over time.)

Giant answered 8/2, 2021 at 21:37 Comment(0)
N
9

I run into the same issue, and no example on this. In my case there is one partition, and I need to read the last message, to know the some info from that message to setup the consumer/producer component I have.

Logic is that start Consumer, subscribe to topic, poll for message -> this triggers on_assign, where the rewinding happens, by assigning the modified partitions back. After on_assign finishes, the poll for msg continues and reads the last message from topic.

settings = {
    "bootstrap.servers": "my.kafka.server",
    "group.id": "my-work-group",
    "client.id": "my-work-client-1",
    "enable.auto.commit": False,
    "session.timeout.ms": 6000,
    "default.topic.config": {"auto.offset.reset": "largest"},
}
consumer = Consumer(settings)

def on_assign(a_consumer, partitions):
    # get offset tuple from the first partition
    last_offset = a_consumer.get_watermark_offsets(partitions[0])
    # position [1] being the last index
    partitions[0].offset = last_offset[1] - 1
    consumer.assign(partitions)

consumer.subscribe(["test-topic"], on_assign=on_assign)

msg = consumer.poll(6.0)

Now msg is having the last message inside.

Nutshell answered 9/4, 2021 at 11:15 Comment(1)
This actually helped me. In case you have multiple partitions, you loop of the partitions and update the offset. After that, your consumer will get the newest messages.Lungwort
D
3

If anyone still needs an example for case with multiple partitions; this is how I did it:

from confluent_kafka import OFFSET_END, Consumer

settings = {
    'bootstrap.servers': "my.kafka.server",
    'group.id': "my-work-group",
    'auto.offset.reset': "latest"
}

def on_assign(consumer, partitions):
    for partition in partitions:
        partition.offset = OFFSET_END
    consumer.assign(partitions)


consumer = Consumer(settings)

consumer.subscribe(["test-topic"], on_assign=on_assign)

msg = consumer.poll(1.0)
Dextrorotation answered 1/2, 2023 at 8:30 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.