Java, How to get number of messages in a topic in apache kafka
Asked Answered
S

18

132

I am using Apache Kafka for messaging. I have implemented the producer and consumer in Java. How can we get the number of messages in a particular topic?

Shipyard answered 18/2, 2015 at 9:25 Comment(1)
My answer gives you an actual way of doing this instead of just consuming the messages: https://mcmap.net/q/172578/-java-how-to-get-number-of-messages-in-a-topic-in-apache-kafkaParrot
S
43

The only way that comes to mind for this from a consumer point of view is to actually consume the messages and count them then.

The Kafka broker exposes JMX counters for number of messages received since start-up but you cannot know how many of them have been purged already.

In most common scenarios, messages in Kafka is best seen as an infinite stream and getting a discrete value of how many that is currently being kept on disk is not relevant. Furthermore things get more complicated when dealing with a cluster of brokers which all have a subset of the messages in a topic.

Salesroom answered 19/2, 2015 at 21:57 Comment(1)
See my answer https://mcmap.net/q/172578/-java-how-to-get-number-of-messages-in-a-topic-in-apache-kafka. The Java Kafka client allows to get that information.Isia
K
128

It is not java, but may be useful

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list <broker>:<port> \
  --topic <topic-name> \
  | awk -F  ":" '{sum += $3} END {print sum}'
Kampmann answered 15/2, 2016 at 20:2 Comment(9)
Shouldn't this be difference of earliest and latest offset per partition sum? bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609 And then the difference returns actual pending messages in topic? Am I correct?Tiu
Yes, that's true. You have to calculate a difference if the earliest offsets do not equal zero.Kampmann
That's what I thought :).Tiu
Is there ANY way to use that as an API and so inside a code (JAVA, Scala or Python)?Baca
Here is a mix of my code and code from Kafka. It may be useful. I used it for Spark streaming - Kafka integration KafkaClient gist.github.com/ssemichev/c2d94dce7ad65339c9637e1b461f86cf KafkaCluster gist.github.com/ssemichev/fa3605c7b10cb6c7b9c8ab54ffbc5865Kampmann
Can somebody help me find out how can i pass SASL config in this command?Dorinda
simplifying @Tiu answer to exact record count : brokers="<broker1:port>" topic=<topic-name> sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}') echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))Warmongering
careful if its compacted topicMerwin
This answer is wrong because this command only gives the last offset. You need to use a combination of time parameter with it and then shell script for a correlation for partitions. That too is not accurate as there can be missing offsets in a series say for compacted topics.Roeder
S
43

The only way that comes to mind for this from a consumer point of view is to actually consume the messages and count them then.

The Kafka broker exposes JMX counters for number of messages received since start-up but you cannot know how many of them have been purged already.

In most common scenarios, messages in Kafka is best seen as an infinite stream and getting a discrete value of how many that is currently being kept on disk is not relevant. Furthermore things get more complicated when dealing with a cluster of brokers which all have a subset of the messages in a topic.

Salesroom answered 19/2, 2015 at 21:57 Comment(1)
See my answer https://mcmap.net/q/172578/-java-how-to-get-number-of-messages-in-a-topic-in-apache-kafka. The Java Kafka client allows to get that information.Isia
B
35

Since ConsumerOffsetChecker is no longer supported, you can use this command to check all messages in topic:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe

Where LAG is the count of messages in topic partition:

enter image description here

Also you can try to use kafkacat. This is an open source project that may help you to read messages from a topic and partition and prints them to stdout. Here is a sample that reads the last 10 messages from sample-kafka-topic topic, then exit:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
Bruns answered 19/2, 2019 at 9:29 Comment(1)
This answer is lacking a bit in precision. LAG is the amount of messages that are pending to be consumed by a consumer. Is not the total of messages in the partition. A value a bit more accurate for the TOTAL of messages in the partitions (but still somewhat misleading) would be LOG-END-OFFSET.Trotline
L
23

I actually use this for benchmarking my POC. The item you want to use ConsumerOffsetChecker. You can run it using bash script like below.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

And below is the result : enter image description here As you can see on the red box, 999 is the number of message currently in the topic.

Update: ConsumerOffsetChecker is deprecated since 0.10.0, you may want to start using ConsumerGroupCommand.

Langton answered 19/7, 2016 at 9:40 Comment(3)
Please note that ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)Conto
Yeah, that's what I said.Langton
Your last sentence is not accurate. The above command still works in 0.10.0.1 and the warning is the same as my previous comment.Conto
C
19

Sometimes the interest is in knowing the number of messages in each partition, for example, when testing a custom partitioner.The ensuing steps have been tested to work with Kafka 0.10.2.1-2 from Confluent 3.2. Given a Kafka topic, kt and the following command-line:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

That prints the sample output showing the count of messages in the three partitions:

kt:2:6138
kt:1:6123
kt:0:6137

The number of lines could be more or less depending on the number of partitions for the topic.

Cramfull answered 29/4, 2018 at 4:26 Comment(2)
If log compaction is enabled, then summing the offsets of the partitions may not give the exact count of messages in the topic.Reflection
This is not showing the total messages. This is showing the last offset. The initial one can be any number.Roeder
S
11

Use https://prestodb.io/docs/current/connector/kafka-tutorial.html

A super SQL engine, provided by Facebook, that connects on several data sources (Cassandra, Kafka, JMX, Redis ...).

PrestoDB is running as a server with optional workers (there is a standalone mode without extra workers), then you use a small executable JAR (called presto CLI) to make queries.

Once you have configured well the Presto server , you can use traditionnal SQL:

SELECT count(*) FROM TOPIC_NAME;
Surfactant answered 21/2, 2016 at 10:54 Comment(1)
this tool is nice, but if it will not work if your topic has more than 2 dots.Selfaddressed
S
9

Apache Kafka command to get un handled messages on all partitions of a topic:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Prints:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

Column 6 is the un-handled messages. Add them up like this:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

awk reads the rows, skips the header line and adds up the 6th column and at the end prints the sum.

Prints

5
Stipulate answered 31/8, 2016 at 14:0 Comment(0)
F
8

Run the following (assuming kafka-console-consumer.sh is on the path):

kafka-console-consumer.sh  --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true  \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Fathead answered 6/8, 2018 at 14:37 Comment(1)
Note: I removed the --new-consumer since that option is no longer available (or apparently necessary)Rinker
I
7

Using the Java client of Kafka 2.11-1.0.0, you can do the following thing :

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

Output is something like this :

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
Isia answered 15/11, 2017 at 17:34 Comment(1)
I prefer you answer comparing to @AutomatedMike answer since your answer doesn't mess with seekToEnd(..) and seekToBeginning(..) methods which change the state of the consumer.Exempt
U
5

To get all the messages stored for the topic you can seek the consumer to the beginning and end of the stream for each partition and sum the results

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
Urbanize answered 27/10, 2016 at 11:2 Comment(1)
btw, if you have compaction turned on then there may be gaps in the stream so the actual number of messages may be lower than the total calculated here. To get an accurate total you're going to have to replay the messages and count them.Urbanize
P
5

I had this same question and this is how I am doing it, from a KafkaConsumer, in Kotlin:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
    .map {
        it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
    }.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
    .first()

Very rough code, as I just got this to work, but basically you want to subtract the topic's beginning offset from the ending offset and this will be the current message count for the topic.

You can't just rely on the end offset because of other configurations (cleanup policy, retention-ms, etc.) that may end up causing the deletion old messages from your topic. Offsets only "move" forward, so it is the beggining offset that will move forward closer to the end offset (or eventually to the same value, if the topic contains no message right now).

Basically the end offset represents the overall number of messages that went through that topic, and the difference between the two represent the number of messages that the topic contains right now.

Parrot answered 20/5, 2020 at 15:30 Comment(0)
M
3

In most recent versions of Kafka Manager, there is a column titled Summed Recent Offsets.

enter image description here

Maser answered 5/1, 2018 at 6:38 Comment(0)
M
3

Excerpts from Kafka docs

Deprecations in 0.9.0.0

The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality.

I am running Kafka broker with SSL enabled for both server and client. Below command I use

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

where /tmp/ssl_config is as below

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
Marcimarcia answered 20/8, 2018 at 9:25 Comment(0)
A
1

If you have access to server's JMX interface, the start & end offsets are present at:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(you need to replace TOPICNAME & PARTITIONNUMBER). Bear in mind you need to check for each of the replicas of given partition, or you need to find out which one of the brokers is the leader for a given partition (and this can change over time).

Alternatively, you can use Kafka Consumer methods beginningOffsets and endOffsets.

Adamek answered 20/8, 2018 at 15:18 Comment(2)
Let me see if I get this right: Enable JMX. Get all metrics. Pick a topic and a partition. For that topic/partition combo, get LogEndOffset and LogStartOffset. Do the difference. That's the number of messages in the queue. Correct?Waterage
If a topic has multiple partitions, then I need to do this math separately for each partition? Then add the results? (I'm new to Kafka, I've only used RabbitMQ before.)Waterage
C
0

If you need to calculate the result for all the consumers in a consumer group, (or for different consumer groups), another option is to use the admin client and subtract the consumer group offsets from the topic/partition offsets, code examples in Kotlin:

val topicName = "someTopic"
val groupId = "theGroupId"
val admin = Admin.create(kafkaProps.buildAdminProperties()) // Spring KafkaProperties
val parts = admin.describeTopics(listOf(topicName)).values()[topicName]!!.get().partitions()
val topicPartitionOffsets = admin.listOffsets(parts.associate { TopicPartition(topicName, it.partition()) to OffsetSpec.latest() }).all().get()
val consumerGroupOffsets = admin.listConsumerGroupOffsets(groupId)
    .partitionsToOffsetAndMetadata().get()
val highWaterMark = topicPartitionOffsets.map { it.value.offset() }.sum()
val consumerPos = consumerGroupOffsets.map { it.value.offset() }.sum()
val unProcessedMessages = highWaterMark - consumerPos

Also here is a working version of LeYAUable's example code which only uses a regular (non-admin) client:

val partitions = consumer.partitionsFor("topicName")
        .map { TopicPartition(it.topic(), it.partition()) }
val highWaterMark = consumer.endOffsets(partitions).values.sum()
val consumerPosition = consumer.beginningOffsets(partitions).values.sum()
val msgCount = highWaterMark - consumerPosition

This will only give you the offset for this particular consumer though! The usual caveat applies that this is imprecise when a topic is compacted.

Cowry answered 1/6, 2022 at 7:46 Comment(0)
R
0

The only precise way to get the accurate number is by reading the messages using a consumer.

Alternatively, the way (not accurate) to get the closest number is to use "kafka.tools.GetOffsetShell" class. In two separate shell commands, get the latest and earliest offsets for each partition using the time parameter (-1 for latest and -2 for earliest) and then use a simple shell script to relate each partitions and subtract the numbers. See below the command examples. But note, that this may not be true as in some rare circumstances, the topics may miss an offset number. Another such example is compacted topics. So, in those cases, this won't work.

Get Latest / End Offset number for a Topic

bin % ./kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic tpmqtt --time -1

tpmqtt:0:8

tpmqtt:1:0

tpmqtt:2:10

Get Earliest Offset number for a Topic:

bin % ./kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic tpmqtt --time -2

tpmqtt:0:0

tpmqtt:1:0

tpmqtt:2:0

In this example for value- "tpmqtt:2:10", tpmqtt is the topic, 2 is the partition number and 10 is the last offset number in this partition.

Roeder answered 29/5, 2023 at 7:41 Comment(0)
M
-1

I haven't tried this myself, but it seems to make sense.

You can also use kafka.tools.ConsumerOffsetChecker (source).

Mariandi answered 2/2, 2016 at 21:30 Comment(0)
S
-1

The simplest way I've found is to use the Kafdrop REST API /topic/topicName and specify the key: "Accept" / value: "application/json" header in order to get back a JSON response.

This is documented here.

Syndesis answered 10/10, 2018 at 16:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.