Kafka 1.1.1-cp1. (Edit 4: I ended up filing a bug with Kafka about this - https://issues.apache.org/jira/browse/KAFKA-7447)
I have 3 brokers, with min.insync.replicas = 2
for all topics, and offsets.commit.required.acks = -1
.
When I stop one of the brokers, as you'd expect, it hands off the partitions it is leader for, and everything carries on as normal (consumers consuming, producers producing).
The problems start when I bring the broker back. What seems to happen is that it causes confusion in the cluster, and some __consumer_offset topics get immediately truncated to 0.
Here's a selection of logs, in chronological order, from an affected __consumer_offset partition (one that was originally lead by the broker which went down). The story plays out across logs from all three brokers.
Essentially, the broker that I bounced comes back, seemingly can't understand what the new leader means, truncates to 0, and then persuades the other replicas to truncate to 0 as well.
prod-kafka-2: (just starting up)
[2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Based on follower's leader epoch, leader replied with an unknown offset in __consumer_offsets-29. The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)
prod-kafka-3: (sees replica1 come back)
[2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
prod-kafka-2:
[2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling unloading of offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached groups. (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions __consumer_offsets-29 (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous Leader Epoch was: 77 (kafka.cluster.Partition)
[2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling loading of offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:24:03,288] INFO [GroupMetadataManager brokerId=1] Finished loading offsets and group metadata from __consumer_offsets-29 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
prod-kafka-3: struggling to agree with prod-kafka-2. Kicks it out of ISR, but then fights with ZooKeeper. Perhaps 2 and 3 both think they're leader?
[2018-09-17 09:24:15,372] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:15,377] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
prod-kafka-2: rudely kicks BOTH of the other two replicas out of the ISR list, even though 2 is the one we just restarted and therefore is most likely behind. (Bear in mind that it already decided to truncate the topic to 0!)
[2018-09-17 09:24:16,481] INFO [Partition __consumer_offsets-29 broker=1] Shrinking ISR from 0,2,1 to 1 (kafka.cluster.Partition)
prod-kafka-3: still fighting with zookeeper. Eventually loses.
[2018-09-17 09:24:20,374] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:20,378] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:25,347] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:25,350] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:30,359] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:30,362] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:35,365] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:35,368] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:40,352] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:40,354] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:45,422] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:45,425] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:50,345] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:50,348] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:55,444] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:55,449] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:00,340] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:00,343] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:05,374] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:05,377] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:10,342] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:10,344] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:15,348] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:15,351] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:20,338] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:20,340] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:25,338] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:25,340] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:30,382] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:30,387] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:35,341] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:35,344] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:40,460] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:40,465] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:45,335] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:45,338] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
prod-kafka-1: suddenly gets confused and also re-inits to 0, as prod-kafka-2 apparently becomes leader.
[2018-09-17 09:25:48,807] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)
prod-kafka-3: finally decides that prod-kafka-2 is in charge, truncates accordingly
[2018-09-17 09:25:48,806] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions __consumer_offsets-29 (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:25:48,807] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([__consumer_offsets-29, initOffset 0 to broker BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)] ) (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:25:48,809] INFO [GroupMetadataManager brokerId=2] Scheduling unloading of offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:25:48,810] INFO [GroupMetadataManager brokerId=2] Finished unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached groups. (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:25:48,950] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Based on follower's leader epoch, leader replied with an unknown offset in __consumer_offsets-29. The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:48,951] INFO [Log partition=__consumer_offsets-29, dir=/var/lib/kafka/data] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)
prod-kafka-1: leadership inauguration confirmed.
[2018-09-17 09:25:50,207] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)
prod-kafka-2: now that it has asserted its dominance via zookeeper, prod-kafka-3 added to the ISR list
[2018-09-17 09:25:50,210] INFO [Partition __consumer_offsets-29 broker=1] Expanding ISR from 1 to 1,2 (kafka.cluster.Partition)
prod-kafka-1: still struggling to accept reality, but eventually also truncates to 0.
[2018-09-17 09:25:51,430] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:52,615] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:53,637] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:54,150] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions __consumer_offsets-29 (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:25:54,151] INFO [ReplicaFetcherManager on broker 0] Added fetcher for partitions List([__consumer_offsets-29, initOffset 0 to broker BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)] ) (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:25:54,151] INFO [GroupMetadataManager brokerId=0] Scheduling unloading of offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:25:54,153] INFO [GroupMetadataManager brokerId=0] Finished unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached groups. (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:25:54,261] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Based on follower's leader epoch, leader replied with an unknown offset in __consumer_offsets-29. The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:54,261] INFO [Log partition=__consumer_offsets-29, dir=/var/lib/kafka/data] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)
prod-kafka-2: completes its coup of consumer offsets, all is now 0.
[2018-09-17 09:25:56,244] INFO [Partition __consumer_offsets-29 broker=1] Expanding ISR from 1,2 to 1,2,0 (kafka.cluster.Partition)
Edit:
As requested, here is kafka server.properties file:
broker.id=1
default.replication.factor=3
auto.create.topics.enable=false
min.insync.replicas=2
num.network.threads=12
num.io.threads=16
num.replica.fetchers=6
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=4
offsets.retention.minutes=10080
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.flush.interval.messages=20000
log.flush.interval.ms=10000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=60000
zookeeper.connect=prod-kafka-1:2181,prod-kafka-2:2181,prod-kafka-3:2181
zookeeper.connection.timeout.ms=6000
confluent.support.metrics.enable=false
confluent.support.customer.id=anonymous
group.initial.rebalance.delay.ms=3000
And here is the zookeeper.properties file:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181
server.1=prod-kafka-1:2888:3888
server.2=prod-kafka-2:2888:3888
server.3=prod-kafka-3:2888:3888
autopurge.purgeInterval=12
autopurge.snapRetainCount=6
Edit 2 Upgrading to Kafka-2.0.0 didn't seem to solve the problem.
It might be that my incoming rate is too high and that I need to throttle the producers when I know that my crashed server is about to recover? Does that sound right…?
Edit 3
setting auto.leader.rebalance.enable=false
solved the problem, but now I have to manually rebalance. However, manually rebalancing when all partitions are caught up doesn't seem to pose any issues.
/var/lib/kafka/data
mounted under some temporary file system that's cleared when you reboot? – Schweitzer/brokers/ids/ID
value and stopping the rogue broker process stopped the reseting offsets for us – Schweitzeroffsets.topic.retention.minutes
andoffsets.topic.segment.bytes
– Radius