Kafkacat consume between timestamp giving wrong results when counting records
Asked Answered
I

1

0

I want to count the number of messages in a given Kafka topic between two timestamps. I tried doing this using kafkacat, using the following command:

# START_DATE = 01.04.2022 02:00:00Z
# END_DATE = 01.04.2022 02:05:00Z
$ kafkacat -C -b broker:9092 -t mytopic -o s@1648778400000 -o e@1648778700000 -p 0 -f '[ts %T] [partition %p] [offset %o] %k\n' -e -c 1

In fact, this is the same approach that is listed as the answer in a very similar question.

According to kafkacat --help:

Consumer options:
  -o <offset>        Offset to start consuming from:
                     beginning | end | stored |
                     <value>  (absolute offset) |
                     -<value> (relative offset from end)
                     s@<value> (timestamp in ms to start at)
                     e@<value> (timestamp in ms to stop at (not included))

Correspondingly, I would expect the above command to give me the first record that has a timestamp greater than s@<value> and smaller than e@<value>. However, it instead gives me a record that has a timestamp prior to s@<value> (in fact, it just gives me the first record in partition 0):

# output of above command
[ts 1648692486141] [partition 0] [offset 2] 643b0013-b3e1-47a5-a9d3-7478c0e91ca4

Am I misunderstanding the consumer options s@<value> and e@<value>?

Kafkacat version:

Version 1.5.0 (JSON, librdkafka 1.2.1 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer)

Additionally, I'm seeing some odd behaviour even with just s@<value>. For example:

kafkacat -C -b broker:9092 -t mytopic -o s@1648778400000 -p 0 -f '[ts %T] [partition %p] [offset %o] %k\n' -e -c 1

should, as I understand it, output the first record with record.timestamp ≥ 1648778400000. The actual output is different:

[ts 1648692486141] [partition 0] [offset 2] 643b0013-b3e1-47a5-a9d3-7478c0e91ca4

and contains a timestamp prior to the one I set (31.03.2022 02:08:06Z vs. 01.04.2022 02:00:00Z).

This output is the same when I tested using docker run edenhill/kcat:1.7.1 (the above was an Ubuntu kafkacat)

Inofficious answered 26/9, 2022 at 11:52 Comment(1)
For the record, since this seems like a bug, I have raised an issue over at the kcat issue tracker on GitHub.Inofficious
C
1

I don't think you can provide -o multiple times. Therefore, your options include

-o e@1648778700000 -p 0  -c 1

To read one message from partition 0, which is less than timestamp 1648778700000


To properly consume between timestamps, find the offsets for the start timestamp, commit them to a consumer group, then start a consumer in the group with your end timestamp

Cumine answered 26/9, 2022 at 13:16 Comment(3)
Ah, I see. That's a bummer. However, this doesn't quite explain why I'm getting the record that I'm getting even with just s@<value>. I've updated the question with some more details.Inofficious
That's my understanding of s@ as well, so not really sure. Perhaps a version issue? Your Docker image is 1.7.1, and the output you show has 1.5.0 ... Also, I could totally be wrong since the last example at the bottom shows the syntax you've given github.com/edenhill/kcat#examplesCumine
Sorry if it was unclear: I tried both a local 1.5.0 kafkacat and also the 1.7.1 kcat docker one. But that's quite peculiar then. Ill try opening an issue on the tracker in that case. Thanks for the help nonetheless.Inofficious

© 2022 - 2024 — McMap. All rights reserved.