How to stop Python Kafka Consumer in program?
Asked Answered
W

4

13

I am doing Python Kafka consumer (trying to use kafka.consumer.SimpleConsumer or kafka.consumer.simple.SimpleConsumer in http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html). When I run the following piece of code, it will run all the time, even if all messages consumed. I hope the consumer will stop if it consume all the messages. How to do it? Also I have no idea how to use stop() function (which is in base class kafka.consumer.base.Consumer).

UPDATE

I used signal handler to call consumer.stop(). Some error messages were printed out to the screen. But the program still was stuck in the for-loop. When new messages came in, the consumer consumed them and printed them. I also tried client.close(). But the same result.

I need some ways to stop the for-loop gracefully.

        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value

Any help is welcome. Thanks.

Woofer answered 5/8, 2015 at 19:27 Comment(1)
If all messages were consumed, add the consumer_timeout_ms property in the KafkaConsumer constructor, that way, if there are no more messages in the topic, your for loop will end correctly. More Info here: (https://mcmap.net/q/753319/-how-to-close-kafka-consumer-once-all-messages-are-consumed)Alkylation
W
2

Use the iter_timeout parameter to set the waiting time. If set to 10, just like the following piece of code, it will exit if no new message come in in 10 seconds. The default value is None, which means that the consumer will block here even if no new messages come in.

        self.consumer = SimpleConsumer(self.client, "test-group", "test",
                iter_timeout=10)

Update

The above is not a good method. When lots of messages come in, it is hard to set a small enough iter_timeout to guarantee the stopping. So, now, I am using get_message() function, which try to consume one message and stop. None is returned when no new messages.

Woofer answered 6/8, 2015 at 15:52 Comment(0)
C
8

We can first check the offset of the last message in the topic. Then stop the loop when we have reached that offset.

    client = "localhost:9092"
    consumer = KafkaConsumer(client)
    topic = 'test'
    tp = TopicPartition(topic,0)
    #register to the topic
    consumer.assign([tp])

    # obtain the last offset value
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)

    consumer.seek_to_beginning(tp)        

    for message in consumer:
        print "Offset:", message.offset
        print "Value:", message.message.value
        if message.offset == lastOffset - 1:
            break
Caffeine answered 1/8, 2017 at 6:4 Comment(1)
this method has one problem is that if the message count is too less, offset will never be committed. because it can not reach the auto commit time interval. And as you break, so it won't commit.Projectionist
M
4

Simpler Solution:

Use poll() instead, with the poll_timeout_ms. poll() is non-blocking call.

  • Create a counter variable outside your while loop.
  • Increase the counter every time poll() fetches 0 records from Kafka Brokers.
  • Reset the counter to 0 if the poll() extracts records
  • If counter == some threshold(say 10), then break out of loop and close the consumer.

In this logic, we rely on the fact that if the poll() didn't fetch any records in 10 subsequent calls, that means we have read all data.

Movie answered 21/10, 2021 at 15:35 Comment(0)
W
2

Use the iter_timeout parameter to set the waiting time. If set to 10, just like the following piece of code, it will exit if no new message come in in 10 seconds. The default value is None, which means that the consumer will block here even if no new messages come in.

        self.consumer = SimpleConsumer(self.client, "test-group", "test",
                iter_timeout=10)

Update

The above is not a good method. When lots of messages come in, it is hard to set a small enough iter_timeout to guarantee the stopping. So, now, I am using get_message() function, which try to consume one message and stop. None is returned when no new messages.

Woofer answered 6/8, 2015 at 15:52 Comment(0)
A
2

Similar solution to Mohit's answer but using the end_offsets function of the consumer.

from kafka import KafkaConsumer, TopicPartition

# settings
client = "localhost:9092"
topic = 'test'

# prepare consumer
tp = TopicPartition(topic,0)
consumer = KafkaConsumer(client)
consumer.assign([tp])
consumer.seek_to_beginning(tp)  

# obtain the last offset value
lastOffset = consumer.end_offsets([tp])[tp]

for message in consumer:
    print "Offset:", message.offset
    print "Value:", message.message.value
    if message.offset == lastOffset - 1:
        break
Annoyance answered 10/1, 2019 at 10:27 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.