kafka-python consumer not receiving messages
Asked Answered
V

7

16

I am having trouble with KafaConsumer to make it read from the beginning, or from any other explicit offset.

Running the command line tools for the consumer for the same topic , I do see messages with the --from-beginning option and it hangs otherwise

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning

If I run it through python, it hangs, which I suspect to be caused by incorrect consumer configs

consumer = KafkaConsumer(topic_name,
                     bootstrap_servers=['localhost:9092'],
                     group_id=None,
                     auto_commit_enable=False,
                     auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
    print "Message", message
    if message is not None:
        print message.offset, message.value

print "Quit"

Output:

Consuming messages from the given topic (hangs after that)

I am using kafka-python 0.9.5 and the broker runs kafka 8.2. Not sure what the exact problem is.

Set _group_id=None_ as suggested by dpkp to emulate the behavior of console consumer.

Violaceous answered 5/2, 2016 at 6:28 Comment(5)
I recently downloaded the kafka package and try your code and it works for me. Can you show your consumer.properties content file?Savoy
#34684910 you may need to set start offset...Pontic
Tried setting the starting offset too, it didn't help either.Violaceous
I was testing it with a topic with more than one partition, it so happens that the issue arises only when the producer does not produce enough messages such that all partitions have at least one message in them. issues.apache.org/jira/browse/KAFKA-3159 The consumer works fine if all the partitions have at least one message.Violaceous
Also KafkaConsumer does not throw exceptions for unsupported codecs which bit me, as I was using lz4 which is not yet supported by the consumer so it wasn't decoding the messages nor did it throw an exception.Violaceous
T
15

The difference between the console-consumer and the python consumer code you have posted is the python consumer uses a consumer group to save offsets: group_id="test-consumer-group" . If instead you set group_id=None, you should see the same behavior as the console consumer.

Tetanize answered 15/3, 2016 at 4:33 Comment(3)
Ya that was one of the problem. The actual issue was the producer was using lz4 as the compression type which wasn't supported by the python consumer, which bailed out without a warning/error.Violaceous
LZ4 support was added to kafka-python in 1.0; the latest version should also no longer silently fail on compression errors.Tetanize
this faulty behaviour (for single node/partitioned kafka) got me spinning for quite some time unless I found this answerKalli
M
10

auto_offset_reset='earliest' solved it for me.

Mutinous answered 16/5, 2019 at 8:48 Comment(0)
N
9

I ran into the same problem: I can recieve in kafka console but can't get message with python script using package kafka-python.

Finally I figure the reason is that I didn't call producer.flush() and producer.close() in my producer.py which is not mentioned in its documentation .

Nw answered 25/11, 2020 at 11:33 Comment(0)
B
7

auto_offset_reset='earliest' and group_id=None solved it for me.

Bejewel answered 4/12, 2019 at 9:13 Comment(0)
B
1

My take is: to print and ensure offset is what you expect it to be. By using position() and seek_to_beginning(), please see comments in the code.

I can't explain:

  1. Why after instantiating KafkaConsumer, the partitions are not assigned, is this by design? Hack around is to call poll() once before seek_to_beginning()
  2. Why sometimes after seek_to_beginning(), first call to poll() returns no data and doesnt change the offset.

Code:

import kafka
print(kafka.__version__)
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
KAFKA_URL = 'localhost:9092' # kafka broker
KAFKA_TOPIC = 'sida3_sdtest_topic' # topic name

# ASSUMING THAT the topic exist

# write to the topic
producer = KafkaProducer(bootstrap_servers=[KAFKA_URL])
for i in range(20):
    producer.send(KAFKA_TOPIC, ('msg' + str(i)).encode() )
producer.flush()

# read from the topic
# auto_offset_reset='earliest', # auto_offset_reset is needed when offset is not found, it's NOT what we need here
consumer = KafkaConsumer(KAFKA_TOPIC,
bootstrap_servers=[KAFKA_URL],
max_poll_records=2,
group_id='sida3'
)

# (!?) wtf, why we need this to get partitions assigned
# AssertionError: No partitions are currently assigned if poll() is not called
consumer.poll()
consumer.seek_to_beginning()

# also AssertionError: No partitions are currently assigned if poll() is not called
print('partitions of the topic: ',consumer.partitions_for_topic(KAFKA_TOPIC))

from kafka import TopicPartition
print('before poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

# (!?) sometimes the first call to poll() returns nothing and doesnt change the offset
messages = consumer.poll()
sleep(1)
messages = consumer.poll()

print('after poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))

print('messages: ', messages)

Output:

2.0.1
partitions of the topic:  {0, 1}
before poll() x2: 
0
0
after poll() x2: 
0
2
messages:  {TopicPartition(topic='sida3_sdtest_topic', partition=1): [ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=0, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1), ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=1, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)]}
Bullfight answered 17/9, 2020 at 9:52 Comment(0)
L
0

I faced the same issue before, so I ran kafka-topics locally at the machine running the code to test and I got UnknownHostException. I added the IP and the host name in hosts file and it worked fine in both kafka-topics and the code. It seems that KafkaConsumer was trying to fetch the messages but failed without raising any exceptions.

Lonergan answered 24/5, 2021 at 14:29 Comment(0)
G
0

For me, I had to specify the router's IP in the kafka PLAINTEXT configuration.

Get the router's IP with:

echo $(ifconfig | grep -E "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep -v 127.0.0.1 | awk '{ print $2 }' | cut -f2 -d: | head -n1)

and then add PLAINTEXT_HOST://<touter_ip>:9092 to the kafka advertised listeners. In case of a confluent docker service the configuration is as follows:

   kafka:
    image: confluentinc/cp-kafka:7.0.1
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://172.28.0.1:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

and finally the python consumer is:

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['172.28.0.1:9092'],
    auto_offset_reset = 'earliest',
    group_id=None,
)

print('Listening')
for msg in consumer:
    print(msg)

Gabble answered 7/4, 2022 at 8:45 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.