How to close kafka consumer once all messages are consumed?
Asked Answered
H

4

12

I have following program to consume all the messages coming to Kafka.

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
consumer.close()

Using above program i am able to consume all the messages coming to Kafka. But once all messages are consumed, i want to close the kafka consumer which is not happening. I need help in same.

Hube answered 17/7, 2017 at 12:26 Comment(0)
H
21

I am able to close kafka consumer now if i provide consumer_timeout_ms argument to KafkaConsumer object. It accepts timeout value in millisecond. Below is the code snippet.

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'],
                         consumer_timeout_ms=1000)
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
consumer.close()

In above code if consumer doesn't see any message for 1 second it will close the session.

Hube answered 19/7, 2017 at 7:8 Comment(1)
Thanks for the answer, seems like you want to call consumer.close() instead of KafkaConsumer.close().Ammonite
S
3

The Kafka configuration parameter enable.partition.eof is what you need. When setting this configuration to true. It will emit the PARTITION_EOF event whenever the consumer reaches the end of a partition. So you can know when you reach the end of a partition through some callback function. By that way, you can choose to close the consumer when you reach the end of all partitions.

Stripe answered 5/7, 2018 at 10:31 Comment(1)
to consume all the messages, not just the first 100 ones (you should look into enumerate which will give you an index with the value, so you don't have to use a manual counter.Obliterate
D
2

It looks like you want consumer.close() instead of KafkaConsumer.close(). It's not documented as a static method.

Dreg answered 17/7, 2017 at 12:30 Comment(2)
i tried using consumer.close() as well but it seems it is not coming out of the for loop itself to execute close(). May be i am missing something not sure what.Hube
I think your loop is just going to wait for more messages indefinitely. You'll have to give it an exit condition to reach the close.Dreg
A
0

I think the accepted answer here is not exactly accurate, so here is my take on this:

You can just add a condition and if it is met you can break the for loop:

for message in consumer:
    if condition:
        break

In your case, you want to stop when all messages are consumed, so you have to find a way to tell the consumer that all messages have arrived.

For example, you could produce a message which could have that information in it, and then your condition would be checking if the message consumed is the one reporting all messages have arrived.

Another example that was mentioned here before is just assuming that if no message arrives for a certain amount of time (1 second was suggested here, but maybe a few more seconds at least might be better) that means that there are no more messages coming.

The way I did it was by checking if all IDs that I had received are accounted for at least once (to avoid duplicates) but that requires you know exactly what you're receiving and some more logic that is probably beyond the scope of this question, but I found it to be a very useful and elegant way to determine how to stop consuming, here is some of the code you would need for that:

sum = 0
data = {
    0: None,
    1: None,
    2: None,
    3: None
}
for message in consumer:
    payload = message.value
    unique_id = payload["unique_id"]
    if data[unique_id] is None:
        data[unique_id] = payload
        sum += 1
    if len(data) == sum:
        break

a much easier way if you know how many messages you will be consuming is to use enumerate like this:

amount_of_messages_to_be_consumed = 40 # as an example 40
for index, message in enumerate(consumer):
    if index == amount_of_messages_to_be_consumed:
        break

of course, after you break out of the for loop you can and should close the consumer (but you were probably just stuck on getting out of the endless for loop...):

consumer.close()
Adventist answered 18/9, 2023 at 21:12 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.