Counting Number of messages stored in a kafka topic
Asked Answered
R

7

30

I'm using 0.9.0.0 version of Kafka and I want to count the number of messages in a topic without using the admin script kafka-console-consumer.sh.

I have tried all the commands in the answer Java, How to get number of messages in a topic in apache kafka but none are yielding the result. Can anyone help me out here?

Rivard answered 22/1, 2017 at 15:23 Comment(2)
Do you want it to work for compacted topics as well because that eliminates a bunch of options like comparing the beginning and lasted offsets.Gleeson
See my answer here for a solution using the Java client.Rappee
S
71

You could try to execute the command below:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1

Then, sum up all the counts for each partition.

Updated: Java implementation

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
......
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("your_topic"));
    Set<TopicPartition> assignment;
    while ((assignment = consumer.assignment()).isEmpty()) {
        consumer.poll(Duration.ofMillis(100));
    }
    final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
    final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment);
    assert (endOffsets.size() == beginningOffsets.size());
    assert (endOffsets.keySet().equals(beginningOffsets.keySet()));

    Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> {
            TopicPartition tp = entry.getKey();
            Long beginningOffset = entry.getValue();
            Long endOffset = endOffsets.get(tp);
            return endOffset - beginningOffset;
        }).sum();
    System.out.println(totalCount);
}
Shearer answered 23/1, 2017 at 3:50 Comment(1)
You should rather compute the sum of diffs between latest and earliest offsets. (--time -2) param gives the earliest ones.Canon
S
11

Technically speaking you can simply consume all messages from the topic and count them:

Example:

kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0*

However kafka.tools.GetOffsetShell approach will give you the offsets and not the actual number of messages in the topic. It means if the topic gets compacted you will get two differed numbers if you count messages by consuming them or by reading offsets.

Topic compaction: https://kafka.apache.org/documentation.html#design_compactionbasics

Spatial answered 21/4, 2017 at 3:59 Comment(2)
Reading off potentially untold (millions?) of messages off a topic in Kafka (which are persistent until purged - not like JMS - persistent until read) is not viable unless time is not relative.Mufti
which count could be potentially higher, the offset number or the number of messages consumed? I guess the first?Intellectualize
C
6

you can sum up all counts by using this :

.../bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list <<broker_1>>:9092,<<broker_2:9092>>... --topic <<your_topic_name>> --time -1 | while IFS=: read topic_name partition_id number; do echo "$number"; done | paste -sd+ - | bc
Clairvoyant answered 6/3, 2017 at 11:30 Comment(2)
Thanks! A bit simpler summing: kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_CLUSTER_HOSTS --topic $TOPIC_NAME --time -1 | tr ":" " " | awk '{ sum += $3 } END { print sum }'Chaparajos
@Chaparajos instead of tr you can also use awk -F: :DVitellus
H
2

If you don't want to buy into the hassle around the "original" Kafka scripts, there's also kafkacat.

The basic idea is to

  • consume the last message of each partition and
  • add up the offsets (correcting for zero-based offsets).

Let's develop this.

kafkacat -C -b <broker> -t <topic> -o -1 -f '%p\t%o\n'

This will output something like this (plus "reached end of partition" notices on stderr):

0    77
1    75
2    78

Now, kafkacat doesn't terminate but keeps waiting for new messages. We can circumvent this by adding a timeout (choose a value large enough so you get all partitions in your given environment):

timeout --preserve-status 1 kafkacat <snip>

Now we could go ahead and add up the second column (+1 each) -- but if there are new messages during that timeout interval, we might get something like this:

0    77
1    75
2    78
1    76

So we have to account for this, which is easy enough to do with a little awk:

timeout --preserve-status 1 kafkacat <snip> 2> /dev/null \
| awk '{lastOffsets[$1] = $2} END {count = 0; for (i in lastOffsets) { count += lastOffsets[i] + 1 }; print count}'

Note how we use a (hash)map to remember the last seen offset for each partition until the timeout triggers, and then loop over the array to compute the sum.

Headstream answered 19/11, 2020 at 13:57 Comment(0)
P
2

We can use kafkacat command to count the number of messages in a topic. Command is as follows. Note that this command will also work even if your messages are multiline.

kafkacat -b <broker_1_ip:port>,<broker_2_ip:port> -t <topic-name> -C -e -q -f 'Offset: %o\n' | wc -l

Subtract 1 from the number printed on the console and that's answer.

Pily answered 1/4, 2021 at 15:27 Comment(0)
S
1

You can also do this using awk and a simple loop

for i in `kafka-run-class kafka.tools.GetOffsetShell --broker-list broker:9092 --time -1 --topic topic_name| awk -F : '{print $3}'`; do sum=$(($sum+$i)); done
Shirtmaker answered 30/10, 2018 at 12:38 Comment(0)
I
0

To get the number of records in topic

brokers="<broker1:port>"
topic=<topic-name>
sum_1=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -1 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))
Insoluble answered 31/7, 2020 at 11:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.