Is it possible to retrieve messages in Kafka console conumer for a particular timestamp range?
For example kafka messags between 08:00 to 09:00 yesterday.
Is it possible to retrieve messages in Kafka console conumer for a particular timestamp range?
For example kafka messags between 08:00 to 09:00 yesterday.
You can use kcat
for consuming messages between two timestamps:
kcat -b localhost:9092 -C -t mytopic -o s@1568276612443 -o e@1568276617901
where
s@
denotes the starting timestamp in mse@
denotes the ending timtestamp in ms (non-inclusive)1568276612443
, what is this?? a millisecond or nanosecond and if yes , from when does it began?? start of the day or start of some particular day?? –
Ostensive Yes, you can do it since Kafka version 0.10.1.
Use the function offsetsForTimes
in KafkaConsumer:
Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. This is a blocking call. The consumer does not have to be assigned the partitions.
KafkaConsumer
api just to retrieve the relevant offset, then use --offset myoffset
in the kafka-console-consumer
, I don't think there is other solution rather than use --property print.timestamp=true
in kafka-console-consumer
and filter the printed time for each message using grep
in order to fit your range. If this is a good direction for you I can write it in more details. –
Fanchon kafka-run-class.sh kafka.tools.GetOffsetShell ... --time <Long: timestamp>
. Then, pass that offset to kafka-console-consumer. –
Trivium refer: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#id14
from confluent_kafka import Consumer,KafkaError,TopicPartition
import argparse
import os
broker_list = '10.100.110.35:9092'
group = 'kcat_consumer'
consumer_conf = {'bootstrap.servers': broker_list,
'group.id': group,
# 'auto.offset.reset': 'earliest'
'enable.auto.commit': False,
}
consumer = Consumer(consumer_conf)
partition = 0
timestamp_ms = 1618258800000
timestamp_offsets = {partition: timestamp_ms}
topic = 'hw-blacklist-1195'
tp = TopicPartition(topic, partition, timestamp_ms)
offsets = consumer.offsets_for_times([tp])
print(offsets[0].offset)
consumer.assign([TopicPartition(topic,0,offsets[0].offset)])
while True:
msg = consumer.poll(1.0,)
if not msg:
print(msg)
break
print(msg.value())
© 2022 - 2025 — McMap. All rights reserved.
s@
denotes the starting timestamp ande@
denotes the ending timestamp. – Miaow