Producing a Kafka message with a Null Value (Tombstone) from the Console
Asked Answered
L

3

7

Is there any way to produce a message in the kafka-console-producer with a null value (ie. mark it for the compactor to delete it with a tombstone)?

I've tried producing "mykey" and "mykey|". The former produces an error and the later makes the value the empty string. Running producer like this:

$KAFKA_HOME/bin/kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=|"
Lizbeth answered 28/8, 2018 at 12:11 Comment(0)
W
3

Unfortunately, there is no way to do that using console-producer

this is a code snippet from ConsoleProducer class (how it reads the data). Kafka 0.11.0 (don't think that it was changed significantly between different versions).

override def readMessage() = {
  lineNumber += 1
  print(">")
  (reader.readLine(), parseKey) match {
    case (null, _) => null
    case (line, true) =>
      line.indexOf(keySeparator) match {
        case -1 =>
          if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
          else throw new KafkaException(s"No key found on line $lineNumber: $line")
        case n =>
          val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
          new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
      }
    case (line, false) =>
      new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
  }
}

as you can see, the value is always an non-nullable array of bytes

Walkthrough answered 28/8, 2018 at 12:22 Comment(3)
It has changed in Kafka 2.0 to Java code from ScalaLafferty
Thanks Natalia, looks like you are correct in that it hasn't changed in the latest version. Don't think I can pass a UTF-8 null character for getBytes to parse either. I'm a bit surprised they handled "case -1" that way, since the user has to explicitly pass "parseKey" to get into that block. Thus, i'd think it should be clear that it's the key being passed and the value could be assigned null. Regardless, rebuilding with change should be easy. Thanks for the help!Lizbeth
Vote for enhancement: issues.apache.org/jira/browse/KAFKA-10238Duplessismornay
O
11

Have a look at kafkacat (the netcat of kafka). Quoting the doc:

Produce a tombstone (a "delete" for compacted topics) for key "abc" by providing an empty message value which -Z interpretes as NULL:

echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:
Oratorio answered 23/1, 2020 at 12:50 Comment(3)
FYI kafkacat does not currently support the ability to produce avro messagesCauserie
@MarioP. I do it everyday: kafkacat -b mybroker -t topic -s avro -r http://schema-registry-url:8080Oratorio
If you have multiple partitions and want to use the same partitioning strategy as Java you also need -X partitioner=murmur2Wiredraw
W
3

Unfortunately, there is no way to do that using console-producer

this is a code snippet from ConsoleProducer class (how it reads the data). Kafka 0.11.0 (don't think that it was changed significantly between different versions).

override def readMessage() = {
  lineNumber += 1
  print(">")
  (reader.readLine(), parseKey) match {
    case (null, _) => null
    case (line, true) =>
      line.indexOf(keySeparator) match {
        case -1 =>
          if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
          else throw new KafkaException(s"No key found on line $lineNumber: $line")
        case n =>
          val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
          new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
      }
    case (line, false) =>
      new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
  }
}

as you can see, the value is always an non-nullable array of bytes

Walkthrough answered 28/8, 2018 at 12:22 Comment(3)
It has changed in Kafka 2.0 to Java code from ScalaLafferty
Thanks Natalia, looks like you are correct in that it hasn't changed in the latest version. Don't think I can pass a UTF-8 null character for getBytes to parse either. I'm a bit surprised they handled "case -1" that way, since the user has to explicitly pass "parseKey" to get into that block. Thus, i'd think it should be clear that it's the key being passed and the value could be assigned null. Regardless, rebuilding with change should be easy. Thanks for the help!Lizbeth
Vote for enhancement: issues.apache.org/jira/browse/KAFKA-10238Duplessismornay
G
2

With this PR the feature was implemented. You can produce a message with Null value like this:

$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --property null.marker=NULL

See the KIP's confluence page for docs

Giustino answered 31/10, 2023 at 15:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.