Here's what I've tried so far:
from confluent_kafka import Consumer
c = Consumer({... several security/server settings skipped...
'auto.offset.reset': 'beginning',
'group.id': 'my-group'})
c.subscribe(['my.topic'])
msg = poll(30.0) # msg is of None type.
msg
almost always ends up being None
though. I think the issue might be that 'my-group'
has already consumed all the messages for 'my.topic'
... but I don't care whether a message has already been consumed or not - I still need the latest message. Specifically, I need the timestamp from that latest message.
I tried a bit more, and from this it looks like there are probably 25 messages in the topic, but I have no idea how to get at them:
a = c.assignment()
print(a) # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets) # Outputs: (25, 25)
If there are no messages because the topic has never had anything written to it at all, how can I determine that? And if that's the case, how can I determine how long the topic has existed for? I'm looking to write a script that automatically deletes any topics that haven't been written to in the past X days (14 initially - will probably tweak it over time.)