There could be multiple root cause to this issue, like faulty networking, but in the case just recovered from, the problem originated from corruption on an cloud volume.
As I said, I recovered my data from a similar situation. Here are the steps I took.
1- Diagnose the issue
I had the following exception
org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.KafkaException: Received exception when fetching the next record from my-topic-0. If needed, please seek past the record to continue consumption.
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: org.apache.kafka.common.KafkaException: Received exception when fetching the next record from my-topic-0. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1507)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:733)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:684)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:980)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:933)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
... 1 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Record batch for partition my-topic-0 at offset 40382641 is invalid, cause: Record is corrupt (stored crc = 3003617146, computed crc = 1717784004)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1565)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1609)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1666)
... 11 common frames omitted
You can see that the offset 40382641
is apparently corrupted.
Using kafka-topics
, you can see which broker has each partition of your topic. In my case, partition 0 which has the corrupted record is located on broker 105 and 102, 105 being the partition leader (from which data is consumed by consumers).
$ kafka-topics --bootstrap-server localhost:9092 --topic "my-topic" --describe
Topic: my-topic TopicId: somethingsomething PartitionCount: 3 ReplicationFactor: 2 Configs: cleanup.policy=compact,message.format.version=3.0-IV1,max.compaction.lag.ms=9223372036854775807
Topic: my-topic Partition: 0 Leader: 105 Replicas: 105,102 Isr: 102,105
Topic: my-topic Partition: 1 Leader: 104 Replicas: 104,107 Isr: 104,107
Topic: my-topic Partition: 2 Leader: 104 Replicas: 104,105 Isr: 104,105
Using kafka-log-dirs
, you can locate where each partitions are actually located on disk.
$ kafka-log-dirs --bootstrap-server localhost:9092 --topic-list "my-topic" --describe
{
"version": 1,
"brokers": [
{
"broker": 102,
"logDirs": [
{
"logDir": "/var/lib/kafka/data",
"error": null,
"partitions": [
{
"partition": "my-topic-0",
"size": 6979728777,
"offsetLag": 0,
"isFuture": false
}
]
}
]
}
{
"broker": 104,
"logDirs": [
{
"logDir": "/var/lib/kafka/data",
"error": null,
"partitions": [
{
"partition": "my-topic-1",
"size": 5710975291,
"offsetLag": 0,
"isFuture": false
},
{
"partition": "my-topic-2",
"size": 7278768636,
"offsetLag": 0,
"isFuture": false
}
]
}
]
},
{
"broker": 105,
"logDirs": [
{
"logDir": "/var/lib/kafka/data",
"error": null,
"partitions": [
{
"partition": "my-topic-0",
"size": 5689809487,
"offsetLag": 0,
"isFuture": false
},
{
"partition": "my-topic-2",
"size": 7278768636,
"offsetLag": 0,
"isFuture": false
}
]
}
]
},
{
"broker": 107,
"logDirs": [
{
"logDir": "/var/lib/kafka/data",
"error": null,
"partitions": [
{
"partition": "my-topic-1",
"size": 7272418267,
"offsetLag": 0,
"isFuture": false
}
]
}
]
}
]
}
If you list all the files in the broker partition, you'll see each Kafka internal log file.
$ ls /var/lib/kafka/data/my-topic-0
00000000000000000000.index 00000000000035574919.timeindex 00000000000038720664.index 00000000000039571633.snapshot
00000000000000000000.log 00000000000036128542.snapshot 00000000000038720664.log 00000000000039571633.timeindex
00000000000000000000.timeindex 00000000000037231368.index 00000000000038720664.snapshot 00000000000039876892.index
00000000000028017178.index 00000000000037231368.log 00000000000038720664.timeindex 00000000000039876892.log
00000000000028017178.log 00000000000037231368.snapshot 00000000000038960720.index 00000000000039876892.snapshot
00000000000028017178.snapshot 00000000000037231368.timeindex 00000000000038960720.log 00000000000039876892.timeindex
00000000000028017178.timeindex 00000000000038203142.index 00000000000038960720.snapshot 00000000000040268605.index
00000000000034752670.index 00000000000038203142.log 00000000000038960720.timeindex 00000000000040268605.log
00000000000034752670.log 00000000000038203142.snapshot 00000000000039269933.index 00000000000040268605.snapshot
00000000000034752670.snapshot 00000000000038203142.timeindex 00000000000039269933.log 00000000000040268605.timeindex
00000000000034752670.timeindex 00000000000038496881.index 00000000000039269933.snapshot leader-epoch-checkpoint
00000000000035574919.index 00000000000038496881.log 00000000000039269933.timeindex partition.metadata
00000000000035574919.log 00000000000038496881.snapshot 00000000000039571633.index
00000000000035574919.snapshot 00000000000038496881.timeindex 00000000000039571633.log
Then, using kafka-dump-log
, you can inspect to log file to check the fault records. Here I'm just using --index-sanity-check
because I just want to check if I have a valid copy somewhere of the corrupted data. Inspecting the log on both broker 105 and 102, it looks like the records from 40382641
to 40382647
are invalid on broker 105, but valid on broker 102. If all your replica have corrupted data, you better hope you have backups...
[admin@broker105:~]$ kafka-dump-log --files "/var/lib/kafka/data/my-topic-0/00000000000040268605.log" --index-sanity-check
Dumping /var/lib/kafka/data/my-topic-0/00000000000040268605.log
Log starting offset: 40268605
baseOffset: 40382641 lastOffset: 40382647 count: 7 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 57 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 311110755 CreateTime: 1714629806898 size: 15025 magic: 2 compresscodec: none crc: 3003617146 isvalid: false
Exception in thread "main" org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /var/lib/kafka/data/my-topic-0/00000000000040268605.log
[admin@broker105:~]$ kafka-dump-log --files "/var/lib/kafka/data/my-topic-0/00000000000040046093.log" --index-sanity-check
baseOffset: 40382641 lastOffset: 40382647 count: 7 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 57 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 893585739 CreateTime: 1714629806898 size: 15025 magic: 2 compresscodec: none crc: 3003617146 isvalid: true
At this point, if you just want restart your consumers and do not require your corrupted records to be recovered, you could just skip to the next offset after the corrupted batch. Here I would need to skip to offset 40382648
.
2- Recovery
The idea will be to force a partition leader change so your broker with valid data distributes it, and the one with corrupted data becomes secondary. The corrupted data will still be there, but just unused. In the next step, we'll see how to clean it.
The kafka-reassign-partitions
utility allows you to manually change the partitions distribution between your brokers. You need a reassignment json file to tell it what should be the new assignation. In this example, I just swapped broker 105 and 102 so 102 becomes the leader and 105 the secondary. First running --verify
to make sure I entered the right partitions.
$ echo '{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[102,105]}]}' >manual-reassign.json
$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file manual-reassign.json --verify
Status of partition reassignment:
There is no active reassignment of partition my-topic-0, but replica set is 105,102 rather than 102,105.
Clearing broker-level throttles on brokers 101,110,106,102,109,105,112,104,107,111,103,108
Clearing topic-level throttles on topic my-topic
$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file manual-reassign.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[102,105],"log_dirs":["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for my-topic-0
Now, you changed the prefered partition leader for my-topic-0
, but broker 102 is not leader yet. An election needs to be triggered. This can be done using kafka-leader-election
.
$ kafka-leader-election --bootstrap-server localhost:9092 --topic my-topic --partition 0 --election-type PREFERRED
Successfully completed leader election (PREFERRED) for partitions my-topic-0
At this point, you can restart your consumer from the offset that used to be corrupted. Since the new partition leader has valid data, it should start without issue and catch up on lag! However, you're corrupted data is not gone yet. It is still in the secondary broker, and need to be cleanup. There is not point having a replica if it has corrupted data.
3- Cleaning corrupted data
We can use the same process of reassining the partitions with kafka-reassign-partitions
. This time however, we won't simply change the prefered leader, we will change the secondary broker. This way, the new secondary broker will replicate the data from the leader and the old secondary broker will eventually cleanup its corrupted data. Here I will move the replica from broker 105 to broker 107.
$ echo '{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[102,107]}]}' >change-replica.json
$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file change-replica.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[102,105],"log_dirs":["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for my-topic-0
Using the --verify
flag, you can follow the progress of the replication.
$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file change-replica.json --verify
Status of partition reassignment:
Reassignment of partition my-topic-0 is still in progress.
$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file change-replica.json --verify
Status of partition reassignment:
Reassignment of partition my-topic-0 is completed.
Clearing broker-level throttles on brokers 101,110,106,102,109,105,112,104,107,111,103,108
Clearing topic-level throttles on topic my-topic
Now you can verify that your new replica is valid data, using kafka-dump-log
like we did in the diagnose step.
Also, note that instead of reassigning the partition to a different broker, you could try to delete the replica from the corrupted secondary broker. However, it's generally better to use the public APIs and tools provided with Kafka rather than messing with the internal files, unless you really know what you're doing.
Cheers!