I found the accepted answer a bit "thin", so may be this will help others.
There are two important things in your code:
KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);
//Send message
producer.send(producerRecord).get();
producer.send(producerRecord)
- that is made from two parts : blocking and non-blocking. The blocking part is made from some "parts":
- Request metadata about brokers from zookeeper
- Serialize message
- Choose a Partition
- Place Message in the RecordAccumulator
Now, usually, the first three steps are fast (first one is cache after the initial call), while the fourth one might take time. It happens because RecordAccumulator
has limited space (buffer.memory
) and how much space you have there at the moment depends on the other thread in the producer client (called Sender Thread
). If this thread is doing a bad job (retrieving and sending messages from RecordAccumulator
to the brokers; there are metrics for all of this btw), your send
method will be blocked (there is no space in the RecordAccumulator
) until space becomes available.
All of these 4 steps are allowed to be blocked to a max of max.block.ms
. And this is what the KIP means when it talks about:
- metadata fetch time (fetches metadata from zookeeper about brokers)
- buffer full block time (the time I was saying about)
- serialization time (customized serializer)
- partitioning time (customized partitioner)
There is also the delivery.timeout.ms
. It's the total time to wait before the message is sent to the partition and it includes: time to push the record to a batch (in the RecordAccumulator
) + time to get acks (for example all
and wait for the message to be replicated across replicas) + time to send the message to the broker, including all retries, if any.
You can think about it as the time it takes from send
method down to when it reaches all replicas and acks is sent back. All this time has to be lower than delivery.timeout.ms
.
Before explaining request.timeout.ms
, imho it's important to understand what max.in.flight.requests.per.connection
is, because they are a little connected. Suppose that a batch is ready to be sent to the broker from the RecordAccumulator
(either because its batch.size
or linger.ms
was fulfilled). If this batch is taken by the so-called "Sender Thread" (which is a thread in the client itself and is != the thread that calls send
method) or not is defined by max.in.flight.requests.per.connection
.
You can have as many as max.in.flight.requests.per.connection
concurrent requests being active at any point in time. A slightly easier way to think about it is this. A "Sender Thread" has a certain loop that it constantly does, in pseudo-code:
while(true) {
// check if there are batches to send
// get the batches to send to the brokers
// make requests to the broker
// poll connections
// etc
}
Suppose that this is the first batch to send. "Sender Thread" increments max.in.flight.requests.per.connection
so that it becomes 1; gets the batches and sends it to the broker. It does not wait for an acknowledgment at this point in time, but goes back to its loop. So on, until it reaches 5 (default value for max.in.flight.requests.per.connection
).
Now suppose there is batch to send to the broker, Sender Thread is not going to take since it has no available requests (we are at max 5 now). Instead it will "Poll Connections": it will ask the broker for the result of the previously sent and the rest of the explanation is here
With all this background it's time to look at request.timeout.ms
, which is actually pretty easy now. When the client polls for connections - tries to get a response from the broker for each in flight request, it can do that within the request.timeout.ms
(which by default is 30 seconds). This value is reset if we retry.