Kafka No broker in ISR for partition
Asked Answered
V

0

10

We have a Kafka cluster consists of 6 nodes. Five of the 6 nodes have zookeeper.

A spark streaming job is reading from a streaming server, do some processing, and send the result to Kafka.

From time to time the spark job got stuck, no data is sent to Kafka, and the job is restarted.

The job keeps stuck and restarted until we manually restart the Kafka cluster. After restarting Kafka everything is working smoothly.

Checking the Kafka logs we found this exception is thrown several times

2017-03-10 05:12:14,177 ERROR state.change.logger: Controller 133 epoch 616 initiated state change for partition [live_stream_2,52] from OfflinePartition to OnlinePartition failed
kafka.common.NoReplicaOnlineException: No broker in ISR for partition [gnip_live_stream_2,52] is alive. Live brokers are: [Set(133, 137, 134, 135, 143)], ISR brokers are: [142] 
    at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:66)
    at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:345)
    at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:205)
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
    at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
    at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:70)
    at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
    at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:164)
    at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:259)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
    at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

The exception above is thrown for an unused topic (live_stream_2) but it is thrown also for a used topic with a little difference.

Here is the exception for the used topic

2017-03-10 12:05:18,535 ERROR state.change.logger: Controller 133 epoch 620 initiated state change for partition [gnip_live_stream,3] from OfflinePartition to OnlinePartition failed
kafka.common.NoReplicaOnlineException: No broker in ISR for partition [live_stream,3] is alive. Live brokers are: [Set(133, 134, 135, 137)], ISR brokers are: [136] 
    at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:66)
    at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:345)
    at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:205)
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
    at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
    at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
    at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:70)
    at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:333)
    at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:164)
    at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:259)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
    at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

In the first exception, it says the ISR broker list for partition 52 contains only the broker with ID 142 which is weird because the cluster has no broker with this id.

In the second exception, it says the ISR broker list for partition 3 contains only the broker with ID 136 which is not existing in the broker live list.

I suspect there is stale data in zookeeper that cause the first exception and for some reason broker 136 was down at specific time which causes the second exception.

My questions

1- Could those exceptions be the reason of Kafka (and consequently the spark job) to stuck?

2- How to solve it?

Valedictorian answered 12/3, 2017 at 12:12 Comment(5)
Did you happen to find an answer so as to why this was happening?Stoop
Hi, got any fix for this issue.Spencerspencerian
unfortunately, we set up a daily cron job to restart Kafka to avoid this issue.Valedictorian
We have a similar issue - what is your kafka version ? I suspect that is is related to: issues.apache.org/jira/browse/KAFKA-3143 or issues.apache.org/jira/browse/KAFKA-3096Wulfenite
Can explain in more details few things such as how is the cluster set up ? What is the version you are using for both Kafka and Zookeeper? What do you mean by 5 nodes have Zookeeper ? Is it a Zookeeper cluster ? How do you reproduce the error ? What is the end to end pipeline like ? That will help debug it furtherGwendolin

© 2022 - 2024 — McMap. All rights reserved.