Kafka Consumer's poll() method gets blocked
Asked Answered
T

2

11

I'm new to Kafka 0.9 and testing some features I realized a strange behaviour in the Java implemented Consumer (KafkaConsumer).

The Kafka broker is located in an Ambari external machine.

Even thou I could implement a Producer and start sending messages to the external broker, I have no clue why, when the consumer tries to read the events (poll), it gets stuck.

I know the producer is working well, since I do can consume messages through the console consumer (which is working locally on ambari). But when I execute the Java Consumer, nothing happens, just gets stuck. Debugging the code I could see that it gets blocked at the poll() line:

    ConsumerRecords<String, String> records = consumer.poll(100);

The timeout does nothing, by the way. Doesn't matter if you put 0, 100 or 1000 ms, the consumer gets blocked in this line and does not timeout nor throw exceptions.

I tried all kind of alternative properties, such as advertised.host.name, advertised.listener,... and so on, with zero luck.

Any help would be highly appreciated. Thanks in advance!

Trimaran answered 7/6, 2016 at 14:36 Comment(8)
Are you able to consume the messages a different way, like by using kafka-console-consumer.sh ?Tonguetied
Yes, I am. From the machine which hosts the ambari, I can consume messages through the console consumerTrimaran
And what about from the machine that you run your consumer on? Did you try the console consumer there?Tonguetied
I didn't, since I don't have kafka installed on it, neither zookeeper.Trimaran
You won't need to install Zookeeper there, just unzip the Kafka binaries somewhere. If you want to rule out things like network connectivity issues (firewalls, etc) then you pretty much have to do this. Otherwise you can't rule those sorts of problems. Your problem could be as simple as your consumer not being able to connect to your Zookeeper instance because of firewall issues.Tonguetied
Thanks for your help David. I will update this If I have good news ; )Trimaran
Asier, any luck on this? I'm having a similar issue, I think it may be the same one, and haven't figured it out yet: #37770524Lumisterol
is there any way to debug this? I'm having the same issues with a consumer seamingly not consumingCoach
G
8

The reason might be the machine where your consumer code is running is unable to connect to zookeeper. Try running the same consumer code on the machine where your Kafka is installed (i tried this and worked for me). I also solved the problem by mentioning the below properties in the server.properties file:

advertised.host.name="ip address which you want to expose"

// In my case, it is the public IP of the EC2 machine, I have kafka and zookeeper installed on the same EC2 machine.

advertised.port=9092

Regarding the statement:

ConsumerRecords<String, String> records = consumer.poll(100);

The above statement doesn't mean the consumer will timeout after 100 ms; rather, it is the polling period. Whatever data it captures in 100 ms is read into records collection.

Gisarme answered 15/12, 2016 at 19:41 Comment(2)
> Whatever data it captures in 100 ms is read into records collection. That's not necessarily correct. The timeout is the upper bound on the poll time for when there are no records available for immediate consumption. From the javadoc (KafkaConsumer.poll(Duration timeout)): ``` This method returns immediately if there are records available. Otherwise, it will await the passed timeout. If the timeout expires, an empty record set will be returned. Note that this method may block beyond the timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks. ```Andrien
In my case even if I am using the (KafkaConsumer.poll(Duration timeout)); it is stuck in infinite loop sometimes and doesn't come out. Not sure how to make this a bit more robust and stable. Any ideas?James
N
1

in my cases,the poll() method finally stuck in the limitless loop ensureCoordinatorReady(), the Coordinator word mentioned me that the coordinator runs on another host.(for test purpose, i only add one broker host to my /etc/hosts while there are three broker totally). so the consumer get the consumer coordinator correctly.

so the solution comes out: configure the hosts correctly running kafka broker in /etc/hosts file

Negativism answered 16/8, 2019 at 1:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.