I want to count the number of messages in a given Kafka topic between two timestamps. I tried doing this using kafkacat
, using the following command:
# START_DATE = 01.04.2022 02:00:00Z
# END_DATE = 01.04.2022 02:05:00Z
$ kafkacat -C -b broker:9092 -t mytopic -o s@1648778400000 -o e@1648778700000 -p 0 -f '[ts %T] [partition %p] [offset %o] %k\n' -e -c 1
In fact, this is the same approach that is listed as the answer in a very similar question.
According to kafkacat --help
:
Consumer options:
-o <offset> Offset to start consuming from:
beginning | end | stored |
<value> (absolute offset) |
-<value> (relative offset from end)
s@<value> (timestamp in ms to start at)
e@<value> (timestamp in ms to stop at (not included))
Correspondingly, I would expect the above command to give me the first record that has a timestamp greater than s@<value>
and smaller than e@<value>
. However, it instead gives me a record that has a timestamp prior to s@<value>
(in fact, it just gives me the first record in partition 0
):
# output of above command
[ts 1648692486141] [partition 0] [offset 2] 643b0013-b3e1-47a5-a9d3-7478c0e91ca4
Am I misunderstanding the consumer options s@<value>
and e@<value>
?
Kafkacat version:
Version 1.5.0 (JSON, librdkafka 1.2.1 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer)
Additionally, I'm seeing some odd behaviour even with just s@<value>
. For example:
kafkacat -C -b broker:9092 -t mytopic -o s@1648778400000 -p 0 -f '[ts %T] [partition %p] [offset %o] %k\n' -e -c 1
should, as I understand it, output the first record with record.timestamp ≥ 1648778400000
. The actual output is different:
[ts 1648692486141] [partition 0] [offset 2] 643b0013-b3e1-47a5-a9d3-7478c0e91ca4
and contains a timestamp prior to the one I set (31.03.2022 02:08:06Z
vs. 01.04.2022 02:00:00Z
).
This output is the same when I tested using docker run edenhill/kcat:1.7.1
(the above was an Ubuntu kafkacat
)