Kafka not receiving messages when indicating group_id in Python
Asked Answered
D

3

6

I am using Kafka (kafka-python) version 3.0.0-1.3.0.0.p0.40. I need to configure the consumer for the topic 'simulation' in Python. When I don't indicate the group_id, i.e. group_id = None it receives messages fine. However if I indicate the group_id, it doesn't receive any messages.

Here's my code in Python:

consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
                         group_id = 'myTestGroupID', enable_auto_commit = True)
consumer.subscribe(['simulation'])
# not using assign method here as auto_commit is enabled
# partitions = [TopicPartition('simulation',num) for num in range(0,9)]
# consumer.assign([TopicPartition('simulation', partitions[0])])

while not self.stop_event.is_set():
    for message in consumer:
        print(message)

I tried to search for some default values of group_id in consumer properties files, I've found one cloudera_mirrormaker however nothing changed. I will need to use multiple consumers therefore it's important that I have a group_id and they share the same group_id. In many sources I've found that the group_id can be any string...

When I run the consumer for this topic in the console it works and receives messages

./kafka-console-consumer.sh --bootstrap-server XXX.XXX.XXX.XXX:9092 --topic simulation --from-beginning --consumer-property group.id=myTestGroupID  --partition 0

when I'm running kafka-consumer-groups.sh to list all available groups it's empty.

If anyone has an idea why it's stuck in Python, it would be so much appreciated. Thanks a lot

Here is code for producer (I've reduced it for simplicity as in this case it doesn't change the problem)

from kafka import KafkaProducer
class Producer(threading.Thread):
    ...
    def run(self):
        producer = KafkaProducer(bootstrap_servers='XXX.XXX.XXX.XXX:9092')
        while not self.stop_event.is_set():
            string = 'test %s' %time.time()
            producer.send('simulation', string.encode())
            time.sleep(0.5)
        producer.close()
Didynamous answered 21/9, 2018 at 0:48 Comment(7)
can you describe that group on kafka bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-groupKuhns
If you consumed all the offsets in the console for the same group, then python wouldn't get any messages, otherwise, what is self.stop_event.is_set()? Please show a minimal reproducible exampleRichardson
@Deadpool it always gives me this Error: Executing consumer group command failed due to The consumer group command timed out while waiting for group to initializeDidynamous
@stovfl I've edited my question by adding the producer part. I've simplified it as it doesn't change the problem. I've just checked, I have kafka-python version 1.4.3 it definitely should support thisDidynamous
The Error from groups.sh --describe should not happend. Retry with a different topic and group_id. Also simplify your consumer testcase to use it without the while not... part.Continuo
Any luck on this? I've got the same issue.Bitstock
@Bitstock yeah I've finally solved it. That was my issue: omkafka config file partitions.number attr was 1 by default, we changed it to 100 as was needed and it started working! I hope it will help youDidynamous
D
2

I've finally solved it.

That was my issue: omkafka config file partitions.number attr was 1 by default.

We changed it to 100 as was needed and it started working! I hope it will help you

Didynamous answered 14/9, 2021 at 11:26 Comment(0)
C
0

Question: Kafka not receiving messages when indicating group_id


Try, pass the 'topic' on KafkaConsumer instantiating, like in the Documentation:

# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('simulation', group_id='myTestGroupID')
for msg in consumer:
    print (msg)

The Documentation: KafkaConsumer is clear about the type of group-id:

group_id (str or None) – The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None

Continuo answered 21/9, 2018 at 8:36 Comment(1)
I've tried this too indicating the topic in when instantiating but it didn't help... from kafka import KafkaConsumer consumer = KafkaConsumer('simulation', group_id='myTestGroupID',bootstrap_servers='XXX.XXX.XXX.XXX:9092', enable_auto_commit = True) for msg in consumer: print (msg)Didynamous
P
0

I had the same issue, partially not receiving messages (most of the messages are lost) when dealing with a high latency environment and large messages (>1Mb).

I did not put a lot of effort on finding the root cause but my guess is that consumer re-balancing is initiated before the message processing is done, which seems to cause issues when no other consumer is available (in my case i had a single consumer or two consumers that suffered the same issue).

What did the trick for me was increasing max_poll_interval_ms and setting max_poll_records=1

consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
                     group_id = 'myTestGroupID', 
                     enable_auto_commit = True,
                     max_poll_interval_ms=5000,
                     max_poll_records=1)

you can find more info at: https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html at the Detecting Consumer Failures section.

Psycho answered 7/7, 2020 at 11:17 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.