As said in the title, i want to get a number of record in my topic and i can't find a solution using kafka-python library. Does anyone have any idea ?
The main idea is to count how many messages there are in each partition of the topic and sum all these numbers. The result is the total number of messages on that topic. I am using confluent_kafka as the main library.
from confluent_kafka import Consumer, TopicPartition
from concurrent.futures import ThreadPoolExecutor
consumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})
def get_partition_size(topic_name: str, partition_key: int):
topic_partition = TopicPartition(topic_name, partition_key)
low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
partition_size = high_offset - low_offset
return partition_size
def get_topic_size(topic_name: str):
topic = consumer.list_topics(topic=topic_name)
partitions = topic.topics[topic_name].partitions
workers, max_workers = [], len(partitions) or 1
with ThreadPoolExecutor(max_workers=max_workers) as e:
for partition_key in list(topic.topics[topic_name].partitions.keys()):
job = e.submit(get_partition_size, topic_name, partition_key)
workers.append(job)
topic_size = sum([w.result() for w in workers])
return topic_size
print(get_topic_size('my.kafka.topic'))
There is no specific API to count the number of records from a topic. You need to consume and count the number of records that you received from kafka consumer.
One solution is you can add one message each to all the partition and get the last offset. From offsets you can calculate the number of total message sent till now to the topic.
But this is not the right approach. You are not aware about how many messages consumers have already consumed and how many messages have been deleted by kafka. The only way is you can consume messages and count the number.
bootstrap_servers = bootstrap_server_list.split(",") consumer_kafka = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', consumer_timeout_ms=10000) consumer_kafka.subscribe([topic]) for message in consumer_kafka: if len(message) > 0: print(message)
–
Glynda from kafka import KafkaConsumer bootstrap_servers = ['localhost:9092'] consumer_kafka = KafkaConsumer(bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest',consumer_timeout_ms=10000) consumer_kafka.subscribe(['test']) for message in consumer_kafka: if len(message) > 0: print(message)
And it is working fine and consuming all the messages. You can look into this code and I hope it will help. –
Masry I wasn't able to get this working with kafka-python
, but I was able to do it fairly easily with confluent-kafka
libraries:
from confluent_kafka import Consumer
topic = "test_topic"
broker = "localhost:9092"
def get_count():
consumer = Consumer({
'bootstrap.servers': broker,
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
})
consumer.subscribe([topic])
total_message_count = 0
while True:
msg = consumer.poll(1.0)
if msg is None:
print("No more messages")
break
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
total_message_count = total_message_count + 1
print('Received message {}: {}'.format(total_message_count,
msg.value().decode('utf-8')))
consumer.close()
print(total_message_count)
poll
pull the message from the queue? –
Marjoriemarjory © 2022 - 2024 — McMap. All rights reserved.