how to view kafka headers
Asked Answered
S

5

34

We are sending message with headers to Kafka using org.apache.kafka.clients.producer.ProducerRecord

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
    this(topic, partition, (Long)null, key, value, headers);
}

How can I actually see these headers using command. kafka-console-consumer.sh only shows me payload and no headers.

Stalwart answered 15/3, 2019 at 10:33 Comment(0)
P
50

You can use the excellent kafkacat tool.

Sample command:

kafkacat -b kafka-broker:9092 -t my_topic_name -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

Sample output:

Key (-1 bytes):
  Value (13 bytes): {foo:"bar 5"}
  Timestamp: 1548350164096
  Partition: 0
  Offset: 34
  Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
 due to serialization error:

The kafkacat header option is only available in recent builds of kafkacat; you may want to build from master branch yourself if your current version doesn't include it.


You can also run kafkacat from Docker:

docker run --rm edenhill/kafkacat:1.5.0 \
      -b kafka-broker:9092 \
      -t my_topic_name -C \
      -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

If you use Docker bear in mind the network implications of how to reach the Kafka broker.

Pavid answered 15/3, 2019 at 11:51 Comment(4)
Thanks a lot for this.Stalwart
does it work with kafka 2.3.0? my kafkacat is 1.3.1 and it returns % ERROR: Unsupported formatter: %hDropline
Works fine against AK 2.3. I'm using kafkacat Version 1.3.1-52-gc7986d (JSON) (librdkafka 1.0.0 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins). Did you try the build from master?Pavid
besides being a great tool (thanks @RobinMoffatt!), another good thing about it is it's available to install via apt-get and brew.Clump
L
39

Starting with kafka-2.7.0 you can enable printing headers in console-consumer by providing property print.headers=true

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic quickstart-events --property print.key=true --property print.headers=true --property print.timestamp=true

Litigant answered 19/1, 2021 at 11:19 Comment(0)
M
11

You can also use kafkactl for this. E.g. with output as yaml:

kafkactl consume my-topic --print-headers -o yaml

Sample output:

partition: 1
offset: 22
headers:
  key1: value1
  key2: value2
value: my-value

Disclaimer: I am contributor to this project

Merwin answered 13/11, 2019 at 20:50 Comment(1)
kafkactl easy install on Centos 7, easy usage, thank youPeraea
D
7

From kafka-console-consumer.sh script:

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

src: https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh

In kafka.tools.ConsoleConsumer the header is provided to the Formatter, but none of the existing Formatters makes use of it:

formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
                                     msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
                                     output)

src: https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala

At the bottom of the above link you can see existing Formatters.

If you want to print headers you need to implement your own kafka.common.MessageFormatter and in particular its write method:

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit

and then run your console consumer with --formatter providing your own formatter (it should also be present on the classpath).

Another, simpler and faster way, would be to implement your own mini-program using KafkaConsumer and check headers in debug.

Devorahdevore answered 15/3, 2019 at 11:24 Comment(0)
G
2
kcat -C -b $brokers -t $topic -f 'key: %k Headers: %h: Message value: %s\n'
Gharry answered 21/3, 2022 at 17:58 Comment(2)
What is kcat, could you provide some pointer?Fortuna
DOH! docs.confluent.io/platform/current/tools/kafkacat-usage.htmlFortuna

© 2022 - 2025 — McMap. All rights reserved.