How to consume messages between two timestamps using Kafka Console Consumer
Asked Answered
O

3

18

Is it possible to retrieve messages in Kafka console conumer for a particular timestamp range?

For example kafka messags between 08:00 to 09:00 yesterday.

Ostensive answered 27/3, 2020 at 9:6 Comment(0)
M
19

You can use kcat for consuming messages between two timestamps:

kcat -b localhost:9092 -C -t mytopic -o s@1568276612443 -o e@1568276617901

where

  • s@ denotes the starting timestamp in ms
  • e@ denotes the ending timtestamp in ms (non-inclusive)
Miaow answered 27/3, 2020 at 11:12 Comment(5)
@Hummingbird It's a timestamp. You can use any timestamp you wish. s@ denotes the starting timestamp and e@ denotes the ending timestamp.Miaow
yes,but the number 1568276612443, what is this?? a millisecond or nanosecond and if yes , from when does it began?? start of the day or start of some particular day??Ostensive
@Hummingbird OK. What is the timestamp interval you want to consume from?Miaow
that's ok , i found the timestamps i wanted . anyways the server i wanted to implement the command does not have kafkacat.Ostensive
FWIW, this does not seem to work (anymore?). See this question as well as the corresponding GitHub issue.Parris
F
3

Yes, you can do it since Kafka version 0.10.1.
Use the function offsetsForTimes in KafkaConsumer:

Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. This is a blocking call. The consumer does not have to be assigned the partitions.

Fanchon answered 27/3, 2020 at 9:52 Comment(3)
hey ofek , i was actually looking to be done in terminal , using the kafka-console-consumer shell scriptOstensive
You can use KafkaConsumer api just to retrieve the relevant offset, then use --offset myoffset in the kafka-console-consumer, I don't think there is other solution rather than use --property print.timestamp=true in kafka-console-consumer and filter the printed time for each message using grep in order to fit your range. If this is a good direction for you I can write it in more details.Fanchon
Or, if your brokers can talk PLAINTEXT, you can use one of Kafka tools to get the offset by timestamp: kafka-run-class.sh kafka.tools.GetOffsetShell ... --time <Long: timestamp>. Then, pass that offset to kafka-console-consumer.Trivium
E
-1

refer: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#id14

from confluent_kafka import Consumer,KafkaError,TopicPartition
import argparse
import os

broker_list = '10.100.110.35:9092'
group = 'kcat_consumer'

consumer_conf = {'bootstrap.servers': broker_list,
                     'group.id': group,
                     #  'auto.offset.reset': 'earliest'
                     'enable.auto.commit': False,
                     }
consumer = Consumer(consumer_conf)

partition = 0
timestamp_ms = 1618258800000
timestamp_offsets = {partition: timestamp_ms}
topic = 'hw-blacklist-1195'
tp = TopicPartition(topic, partition, timestamp_ms)
offsets = consumer.offsets_for_times([tp])
print(offsets[0].offset)

consumer.assign([TopicPartition(topic,0,offsets[0].offset)])
while True:
    msg = consumer.poll(1.0,)
    if not msg:
        print(msg)
        break
    
    print(msg.value())
Entrepreneur answered 7/11, 2023 at 2:38 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.