Is there a way to add headers in kafka-console-producer.sh
Asked Answered
M

3

17

I'd like to use the kafka-console-producer.sh to fire a few JSON messages with Kafka headers.

Is this possible?

docker exec -it kafka_1 /opt/kafka_2.12-2.3.0/bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-topic --producer.config /opt/kafka_2.12-2.3.0/config/my-custom.properties
Michigan answered 5/11, 2019 at 17:38 Comment(1)
Possible duplicate of How to produce messages with headers in Kafka 0.11 using console producer?Piss
L
16

No, but you can with kafkacat's -H argument:

Produce:

echo '{"col_foo":1}'|kafkacat -b localhost:9092 -t test -P -H foo=bar

Consume:

kafkacat -b localhost:9092 -t test -C -f '-----\nTopic %t[%p]\nOffset: %o\nHeaders: %h\nKey: %k\nPayload (%S bytes): %s\n'
-----
Topic test[0]
Offset: 0
Headers: foo=bar
Key:
Payload (9 bytes): col_foo:1
% Reached end of topic test [0] at offset 1
Laundryman answered 5/11, 2019 at 19:29 Comment(1)
Now it's called kcat, just for the record :)Heterotopia
G
10

Starting from kafka 3.1.0, there is an option to turn on headers parsing parse.headers=true and then you just place them before your record value. Info from docs:

| parse.headers=true:
|  "h1:v1,h2:v2...\tvalue"

So your command will look like this:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_name --property parse.headers=true

and then you should pass:

header_name:header_value\trecord_value

Galagalactagogue answered 21/7, 2022 at 11:22 Comment(5)
for what it's worth. The default property for headers.delimiter is \t. After some confusion. This command will need --property headers.delimiter='\n' passed along as well otherwise the CLI complains with exceptions regarding the header delimiter.Linskey
I am getting No headers delimiter found on line number 1: with this message "key:value\t sa"Apia
cwiki.apache.org/confluence/display/KAFKA/…Semblance
Can you add one example with header as json payload and value as json payload as well.Ceuta
I had to add --property headers.delimiter=\t. Wasn't defined as default on my system 🤷Adelleadelpho
S
0

Consumer producer command:

docker exec -it container_id kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092 --property parse.headers=true --property headers.key.separator=: --property headers.delimiter=\t

Payload format:

header_name:header_value t payload_value

Console consumer command:

docker exec -it container_id kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --property print.headers=true
Stadler answered 13/7, 2023 at 16:0 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.