How to produce messages to selected partition using kafka-console-producer?
Asked Answered
B

5

22

According to the Kafka documentation:

The producer is responsible for choosing which message to assign to which partition within the topic.

How can I send messages to a selected partition using kafka-console-producer.sh?

I would like to specify some sort of 'partition id' at message sending.

Bergquist answered 24/10, 2014 at 17:46 Comment(0)
K
3

kafka-console-producer.sh doesn't support producing messages to a particular partition out of the box.

However it should be pretty straightforward to update the script to pass an extra parameter for partition Id and then handle it in a custom partitioner as described in the post by @Chiron in a modified version of kafka.tools.ConsoleProducer class.

Take a look at the source code at:

https://apache.googlesource.com/kafka/+/refs/heads/trunk/bin/kafka-console-producer.sh https://apache.googlesource.com/kafka/+/refs/heads/trunk/core/src/main/scala/kafka/tools/ConsoleProducer.scala

Knavish answered 27/10, 2014 at 17:30 Comment(0)
C
21

Targeting a specicic partition is not possible, but the ConsoleProducer does support writing keyed messages to the topic.

Kafka will use the hash of the key to distribute the message into partitions, at least with the default behaviour.

Currently, the default separator is \t, so entering key[\t]message will distribute it amongst partitions:

key1    a-message

The separator can be changed by providing the key.separator configuration, for example:

kafka-console-producer --broker-list localhost:9092,localhost:9093 \
  --topic mytopic --property key.separator=,

Send messages like this:

key2,another-message

I have tested this with the default tab and a custom separator successfully. The messages were distributed to two separate partitions.

Camelback answered 26/4, 2016 at 15:1 Comment(4)
It worked for me also. Value can contain separator also. It just looked for the first separator position and split record into two tokensKnow
How to retrieve the messages in console consumer according to the partitions?Cuirassier
Consider using something like kafkacat.Camelback
kafka-console-consumer accepts the print.key property. You can also customize the string to separate the key and value in the output: key.separator. --property print.key=true --property key.separator=" - "Willed
A
17

According to the current state of things (Kafka>=0.10.0.1), the kafka-console-producer.sh script and the underlying ConsoleProducer java class support sending data with a partition key but such support is disabled by default and has to be enabled from the CLI.

Note that the partition key is not necessarily the same as the "partition id". By default, the partition is calculated from the key using kafka.producer.DefaultPartitioner, and you would either need to change that (for that you'd need to add the new Partitioner implementation to the classpath that kafka-console-producer.sh uses), or figure out which is the partition id from the partition key, depending on your use case and what your readers expect.

In any case, to enable parsing keys for the messages you'll need to set the property parse.key to true (by default it's false). Also, if you want to use something different than a tab character, use key.separator as specified in Cedric's answer.

In the end, the command line would be:

kafka-console.producer.sh --broker-list kafka:9092,kafka2:9092 \
    --topic $TOPIC --property parse.key=true --property key.separator=|
Antiquate answered 20/12, 2016 at 9:58 Comment(2)
This sends a key, sure, but that doesn't guarantee a "specific partition" as the OP wantedWaiter
Thanks, I re-edited to try to make the answer, hopefully, a bit more clear.Antiquate
M
8

Here is your starting point:
partitioner.class setting in your Properties instance. In Kafka, the default implementation is kafka.producer.DefaultPartitioner.

The goal of that setting is:

The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.

This means that if you want to change the behaviour of the default partitioner , such as targeting a specific partition, then you need to create your own implementation of kafka.producer.Partitioner interface.

I would suggest to be really careful when creating your own strategy and really, test it a lot and monitor your topics and their partitions.

When it is built as a JAR, you can set the path to the JAR in CLI CLASSPATH variable, then kafka-console-producer will be able to detect and use it.

Mccay answered 27/10, 2014 at 12:42 Comment(0)
K
3

kafka-console-producer.sh doesn't support producing messages to a particular partition out of the box.

However it should be pretty straightforward to update the script to pass an extra parameter for partition Id and then handle it in a custom partitioner as described in the post by @Chiron in a modified version of kafka.tools.ConsoleProducer class.

Take a look at the source code at:

https://apache.googlesource.com/kafka/+/refs/heads/trunk/bin/kafka-console-producer.sh https://apache.googlesource.com/kafka/+/refs/heads/trunk/core/src/main/scala/kafka/tools/ConsoleProducer.scala

Knavish answered 27/10, 2014 at 17:30 Comment(0)
P
0
C:\arunsingh\demo\kafka_2.13-2.4.0\bin\windows>kafka-console-producer.bat --broker-list 127.0.0.1:9094 --topic arun_topic --property parse.key=true --property key.separator=, --producer-property acks=all
>myKey1, Message with key
>myKey2, Message with key 2
>
Plashy answered 21/4, 2020 at 10:39 Comment(2)
But how do we include the key as well in the stored message (worst case I can duplicate it)Virtually
This doesn't answer the question about how to send data to a specific partition, and doesn't add to the existing answersWaiter

© 2022 - 2024 — McMap. All rights reserved.