Kafka message corrupted in master but replica looks good
Asked Answered
T

1

6

Kafka version : 0.9

We have a java application trying to read from Kafka . but we see data corruption in kafka messages

Below is the error from Java application :

    org.apache.kafka.common.KafkaException: Error deserializing key/value for partition varnish_raw_logs-31 at offset 568565
Caused by: org.apache.kafka.common.record.InvalidRecordException: Record is corrupt (stored crc = 1835858402, computed crc = 4025136930)

So i decided to see which node own the partition 31 and see how the data looks using the kafka dump log util . you see from below that parition 31 is owned by node-4 and node-2 has its replica .

Topic: varnish_raw_logs Partition: 31   Leader: 4       Replicas: 4,2   Isr: 4,2

Now when i use the dump util to see the logs from both node 4 and node 2 . I see that the data is valid in replica node(node-2) but corrupted in master .

Left is corrupter master, Right is Replica

So my question is : What is the root-cause for master(node-4) to get into this state ? While you can argue we should skip event specific errors . I don't want to miss these messages because i have a good message in replica .

Tootsy answered 12/4, 2017 at 20:42 Comment(2)
1) Maybe disc corruption on the leader? Can you provide any details about your disc setup and smart? 2) you could force a leader switch to read over it - just reverse the replica list (in ZK /brokers/topics/xx I think)Staples
Late comment: If at least one replica is good, you can try to force a leader reassignment, then delete the corrupted file, and let it re-replicateLowestoft
H
1

There could be multiple root cause to this issue, like faulty networking, but in the case just recovered from, the problem originated from corruption on an cloud volume.

As I said, I recovered my data from a similar situation. Here are the steps I took.

1- Diagnose the issue

I had the following exception

org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.KafkaException: Received exception when fetching the next record from my-topic-0. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) 
Caused by: org.apache.kafka.common.KafkaException: Received exception when fetching the next record from my-topic-0. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1507)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:733)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:684)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:980)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:933)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    ... 1 common frames omitted 
Caused by: org.apache.kafka.common.KafkaException: Record batch for partition my-topic-0 at offset 40382641 is invalid, cause: Record is corrupt (stored crc = 3003617146, computed crc = 1717784004)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1565)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1609)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1666) 
    ... 11 common frames omitted

You can see that the offset 40382641 is apparently corrupted.

Using kafka-topics, you can see which broker has each partition of your topic. In my case, partition 0 which has the corrupted record is located on broker 105 and 102, 105 being the partition leader (from which data is consumed by consumers).

$ kafka-topics --bootstrap-server localhost:9092 --topic "my-topic" --describe
Topic: my-topic TopicId: somethingsomething PartitionCount: 3   ReplicationFactor: 2    Configs: cleanup.policy=compact,message.format.version=3.0-IV1,max.compaction.lag.ms=9223372036854775807
    Topic: my-topic Partition: 0    Leader: 105 Replicas: 105,102   Isr: 102,105
    Topic: my-topic Partition: 1    Leader: 104 Replicas: 104,107   Isr: 104,107
    Topic: my-topic Partition: 2    Leader: 104 Replicas: 104,105   Isr: 104,105

Using kafka-log-dirs, you can locate where each partitions are actually located on disk.

$ kafka-log-dirs --bootstrap-server localhost:9092 --topic-list "my-topic" --describe
{
  "version": 1,
  "brokers": [
    {
      "broker": 102,
      "logDirs": [
        {
          "logDir": "/var/lib/kafka/data",
          "error": null,
          "partitions": [
            {
              "partition": "my-topic-0",
              "size": 6979728777,
              "offsetLag": 0,
              "isFuture": false
            }
          ]
        }
      ]
    }
    {
      "broker": 104,
      "logDirs": [
        {
          "logDir": "/var/lib/kafka/data",
          "error": null,
          "partitions": [
            {
              "partition": "my-topic-1",
              "size": 5710975291,
              "offsetLag": 0,
              "isFuture": false
            },
            {
              "partition": "my-topic-2",
              "size": 7278768636,
              "offsetLag": 0,
              "isFuture": false
            }
          ]
        }
      ]
    },
    {
      "broker": 105,
      "logDirs": [
        {
          "logDir": "/var/lib/kafka/data",
          "error": null,
          "partitions": [
            {
              "partition": "my-topic-0",
              "size": 5689809487,
              "offsetLag": 0,
              "isFuture": false
            },
            {
              "partition": "my-topic-2",
              "size": 7278768636,
              "offsetLag": 0,
              "isFuture": false
            }
          ]
        }
      ]
    },
    {
      "broker": 107,
      "logDirs": [
        {
          "logDir": "/var/lib/kafka/data",
          "error": null,
          "partitions": [
            {
              "partition": "my-topic-1",
              "size": 7272418267,
              "offsetLag": 0,
              "isFuture": false
            }
          ]
        }
      ]
    }
  ]
}

If you list all the files in the broker partition, you'll see each Kafka internal log file.

$ ls /var/lib/kafka/data/my-topic-0
00000000000000000000.index        00000000000035574919.timeindex    00000000000038720664.index      00000000000039571633.snapshot
00000000000000000000.log        00000000000036128542.snapshot   00000000000038720664.log        00000000000039571633.timeindex
00000000000000000000.timeindex    00000000000037231368.index    00000000000038720664.snapshot   00000000000039876892.index
00000000000028017178.index          00000000000037231368.log        00000000000038720664.timeindex  00000000000039876892.log
00000000000028017178.log            00000000000037231368.snapshot   00000000000038960720.index      00000000000039876892.snapshot
00000000000028017178.snapshot       00000000000037231368.timeindex  00000000000038960720.log        00000000000039876892.timeindex
00000000000028017178.timeindex    00000000000038203142.index    00000000000038960720.snapshot   00000000000040268605.index
00000000000034752670.index          00000000000038203142.log        00000000000038960720.timeindex  00000000000040268605.log
00000000000034752670.log            00000000000038203142.snapshot   00000000000039269933.index      00000000000040268605.snapshot
00000000000034752670.snapshot       00000000000038203142.timeindex  00000000000039269933.log        00000000000040268605.timeindex
00000000000034752670.timeindex    00000000000038496881.index    00000000000039269933.snapshot   leader-epoch-checkpoint
00000000000035574919.index          00000000000038496881.log        00000000000039269933.timeindex  partition.metadata
00000000000035574919.log            00000000000038496881.snapshot   00000000000039571633.index
00000000000035574919.snapshot       00000000000038496881.timeindex  00000000000039571633.log

Then, using kafka-dump-log, you can inspect to log file to check the fault records. Here I'm just using --index-sanity-check because I just want to check if I have a valid copy somewhere of the corrupted data. Inspecting the log on both broker 105 and 102, it looks like the records from 40382641 to 40382647 are invalid on broker 105, but valid on broker 102. If all your replica have corrupted data, you better hope you have backups...

[admin@broker105:~]$ kafka-dump-log --files "/var/lib/kafka/data/my-topic-0/00000000000040268605.log" --index-sanity-check
Dumping /var/lib/kafka/data/my-topic-0/00000000000040268605.log
Log starting offset: 40268605 
baseOffset: 40382641 lastOffset: 40382647 count: 7 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 57 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 311110755 CreateTime: 1714629806898 size: 15025 magic: 2 compresscodec: none crc: 3003617146 isvalid: false
Exception in thread "main" org.apache.kafka.common.errors.CorruptRecordException: Found record size 0 smaller than minimum record overhead (14) in file /var/lib/kafka/data/my-topic-0/00000000000040268605.log

[admin@broker105:~]$ kafka-dump-log --files "/var/lib/kafka/data/my-topic-0/00000000000040046093.log" --index-sanity-check
baseOffset: 40382641  lastOffset: 40382647 count: 7 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 57 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 893585739 CreateTime: 1714629806898 size: 15025 magic: 2 compresscodec: none crc: 3003617146 isvalid: true

At this point, if you just want restart your consumers and do not require your corrupted records to be recovered, you could just skip to the next offset after the corrupted batch. Here I would need to skip to offset 40382648.

2- Recovery

The idea will be to force a partition leader change so your broker with valid data distributes it, and the one with corrupted data becomes secondary. The corrupted data will still be there, but just unused. In the next step, we'll see how to clean it.

The kafka-reassign-partitions utility allows you to manually change the partitions distribution between your brokers. You need a reassignment json file to tell it what should be the new assignation. In this example, I just swapped broker 105 and 102 so 102 becomes the leader and 105 the secondary. First running --verify to make sure I entered the right partitions.

$ echo '{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[102,105]}]}' >manual-reassign.json
$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file manual-reassign.json --verify
Status of partition reassignment:
There is no active reassignment of partition my-topic-0, but replica set is 105,102 rather than 102,105.

Clearing broker-level throttles on brokers 101,110,106,102,109,105,112,104,107,111,103,108
Clearing topic-level throttles on topic my-topic

$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file manual-reassign.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[102,105],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for my-topic-0

Now, you changed the prefered partition leader for my-topic-0, but broker 102 is not leader yet. An election needs to be triggered. This can be done using kafka-leader-election.

$ kafka-leader-election --bootstrap-server localhost:9092 --topic my-topic --partition 0 --election-type PREFERRED
Successfully completed leader election (PREFERRED) for partitions my-topic-0

At this point, you can restart your consumer from the offset that used to be corrupted. Since the new partition leader has valid data, it should start without issue and catch up on lag! However, you're corrupted data is not gone yet. It is still in the secondary broker, and need to be cleanup. There is not point having a replica if it has corrupted data.

3- Cleaning corrupted data

We can use the same process of reassining the partitions with kafka-reassign-partitions. This time however, we won't simply change the prefered leader, we will change the secondary broker. This way, the new secondary broker will replicate the data from the leader and the old secondary broker will eventually cleanup its corrupted data. Here I will move the replica from broker 105 to broker 107.

$ echo '{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[102,107]}]}' >change-replica.json
$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file change-replica.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"my-topic","partition":0,"replicas":[102,105],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for my-topic-0

Using the --verify flag, you can follow the progress of the replication.

$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file change-replica.json --verify
Status of partition reassignment:
Reassignment of partition my-topic-0 is still in progress.

$ kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file change-replica.json --verify
Status of partition reassignment:
Reassignment of partition my-topic-0 is completed.

Clearing broker-level throttles on brokers 101,110,106,102,109,105,112,104,107,111,103,108
Clearing topic-level throttles on topic my-topic

Now you can verify that your new replica is valid data, using kafka-dump-log like we did in the diagnose step.

Also, note that instead of reassigning the partition to a different broker, you could try to delete the replica from the corrupted secondary broker. However, it's generally better to use the public APIs and tools provided with Kafka rather than messing with the internal files, unless you really know what you're doing.

Cheers!

Hooker answered 3/5, 2024 at 20:18 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.