Rebalancing issue while reading messages in Kafka
Asked Answered
C

5

19

I am trying to read messages on Kafka topic, but I am unable to read it. The process gets killed after sometime, without reading any messages.

Here is the rebalancing error which I get:

[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages

I tried to run ConsumerOffsetChecker, and this is the error which I get. I have no clue whatsoever, how to resolve this. Anybody, any idea?

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
Group           Topic                          Pid Offset          logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
        at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 16 more
Claymore answered 21/3, 2014 at 15:50 Comment(4)
Given the message in the exception it seems to me that the Kafka consumer was able to connect to ZK, but it didn't find the /consumers path in there, which sounds like the ZK database was corruptedPembrook
I sometimes experience this error as well. However connecting via the console-consumer script as user2720864 outlines below works fine.Corves
This thread on the Kafka-users list talks about this issue mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/…Corves
is the issue resolved? if yes, how?Cuculiform
G
16

I have got similar problems recently. You can try to increase the consumer configurations rebalance.backoff.ms and zookeeper.session.timeout.ms to about 5-10 seconds.

The first parameter tell kafka to wait more before retrying rebalance. The second one tell kafka to be more patient while trying to connect to zookeeper.

Other configuration options can be found on the official documentation.

Gastro answered 29/3, 2014 at 15:1 Comment(3)
Sorry, I misunderstood. First I tried changing these options at the server which did nothing. Then I changed them at the consumer. This allowed me to connect, but now I get a java.nio.channels.ClosedByInterruptException. On the list grokbase.com/t/kafka/users/141ej00nf9/… they point to the Wiki that says GC usage might be the culprit. cwiki.apache.org/confluence/display/KAFKA/…? I don't believe that in my case because the application is just starting:Corves
@MarkButler I have seen those errors on startup when there are other running consumers. Have you verified in ZK that the rebalancing is OK after all ? It is in the zk path /consumers/[group_name]/owners/[topic name]Abdul
I tried the above mentioned steps but nothing has changed, i'm still seeing the same error .Cuculiform
L
2

This probably means that the brokers did not create those nodes correctly when it connected to Zookeeper. The /consumer path should exist when you try to consume.

Here are a few paths for debugging:

Did you create any topics?

If so:

  1. How many partitions are there in the topic?
  2. Did you check that the topic metadata were correctly populated in the zookeeper?
  3. Can we see your consumer configuration?

If not:

  1. Then you need to create a topic using the script $KAFKA_DIR/bin/kafka-create-topic.sh. Look inside the script for usage details.
  2. Once you make a topic, you need to create a consumer with a group ID that has not been used before, otherwise you will not start fresh.
Luminiferous answered 27/3, 2014 at 3:45 Comment(0)
H
1

There is a bug in kafka.tools.ConsumerOffsetChecker. If the a particular Zookeeper node holding consumed offset information doesn't exit, the tool exits throwing the execption.

For example, suppose you have a consumer group "mygroup" and a topic "topictest". Then the offset for partition 2 is maintained in Znode: /consumers/mygroup/offsets/topictest/2.

If there is no entry for partition 2 of topic topictest in Znode, then consumer offsetchecker tool will exit while checking offset for partition 2. Basically, it will fail while checking the first partition "n" for which the Znode /consumers/mygroup/offsets/topictest/n is missing on Zookeeper.

Hokku answered 22/4, 2014 at 7:56 Comment(2)
Is there any jira ticket about it on: issues.apache.org/jira/browse/ZOOKEEPER or issues.apache.org/jira/browse/KAFKA ?Subsidiary
@Filippo I don't know. But the tool can be fixed easily.Hokku
S
0

probably your brokers are offline and they are not able to connect to Zookeeper, have you tried running the console-consumer script available in the $KAFKA_ROOT_DIR/bin path for checking if you are able to consume from a specific topic.

Selfpossessed answered 27/3, 2014 at 12:52 Comment(0)
W
0

Another Issue might be because of jar conflicts. If you have the same jar with different versions stored in library folder. This Issue may arise. jars like scala-library ,zkclient, zookeeper, kafka-client should not be duplicated with different versions.

Wines answered 2/3, 2018 at 10:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.