How to count number of records (message) in the topic using kafka-python
Asked Answered
G

4

6

As said in the title, i want to get a number of record in my topic and i can't find a solution using kafka-python library. Does anyone have any idea ?

Glynda answered 7/10, 2020 at 15:46 Comment(3)
Does this answer your question? Counting Number of messages stored in a kafka topicDermatologist
Yes @mike. this solution is very similar to it. I'll try to reproduce it in Python (i'm not very good at Java)Glynda
i can't use the command line. In my project, i need to use Python Kafka APIGlynda
S
3

The main idea is to count how many messages there are in each partition of the topic and sum all these numbers. The result is the total number of messages on that topic. I am using confluent_kafka as the main library.

from confluent_kafka import Consumer, TopicPartition
from concurrent.futures import ThreadPoolExecutor

consumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})

def get_partition_size(topic_name: str, partition_key: int):
    topic_partition = TopicPartition(topic_name, partition_key)
    low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
    partition_size = high_offset - low_offset
    return partition_size

def get_topic_size(topic_name: str):
    topic = consumer.list_topics(topic=topic_name)
    partitions = topic.topics[topic_name].partitions
    workers, max_workers = [], len(partitions) or 1

    with ThreadPoolExecutor(max_workers=max_workers) as e:
        for partition_key in list(topic.topics[topic_name].partitions.keys()):
            job = e.submit(get_partition_size, topic_name, partition_key)
            workers.append(job)

    topic_size = sum([w.result() for w in workers])
    return topic_size

print(get_topic_size('my.kafka.topic'))
Spigot answered 9/8, 2022 at 14:2 Comment(1)
If you have enabled compaction, hi - low calculation will give you wrong answer.Cru
M
1

There is no specific API to count the number of records from a topic. You need to consume and count the number of records that you received from kafka consumer.

Milwaukee answered 7/10, 2020 at 15:53 Comment(1)
No you don't need to consume, you can use watermark offsets to calculate the size of the topic, see answer by @SpigotTwombly
M
1

One solution is you can add one message each to all the partition and get the last offset. From offsets you can calculate the number of total message sent till now to the topic.

But this is not the right approach. You are not aware about how many messages consumers have already consumed and how many messages have been deleted by kafka. The only way is you can consume messages and count the number.

Masry answered 7/10, 2020 at 16:0 Comment(3)
When i consume message in my topic, it return only one message. Do you know how can i consume all messages ? My consumer function : bootstrap_servers = bootstrap_server_list.split(",") consumer_kafka = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', consumer_timeout_ms=10000) consumer_kafka.subscribe([topic]) for message in consumer_kafka: if len(message) > 0: print(message)Glynda
I am not sure how to consume messages in kafka-python. But I tried running - from kafka import KafkaConsumer bootstrap_servers = ['localhost:9092'] consumer_kafka = KafkaConsumer(bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest',consumer_timeout_ms=10000) consumer_kafka.subscribe(['test']) for message in consumer_kafka: if len(message) > 0: print(message) And it is working fine and consuming all the messages. You can look into this code and I hope it will help.Masry
This is actually going in an infinite loop how do we come out of that??Stairhead
E
0

I wasn't able to get this working with kafka-python, but I was able to do it fairly easily with confluent-kafka libraries:

from confluent_kafka import Consumer

topic = "test_topic"
broker = "localhost:9092"

def get_count():
    consumer = Consumer({
        'bootstrap.servers': broker,
        'group.id': 'my-group',
        'auto.offset.reset': 'earliest',
    })

    consumer.subscribe([topic])

    total_message_count = 0
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            print("No more messages")
            break
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        total_message_count = total_message_count + 1
        print('Received message {}: {}'.format(total_message_count,     
msg.value().decode('utf-8')))

    consumer.close()

    print(total_message_count)
Encyclopedia answered 19/10, 2021 at 23:46 Comment(1)
Doesn't poll pull the message from the queue?Marjoriemarjory

© 2022 - 2024 — McMap. All rights reserved.