Apache Kafka Producer Config: 'request.timeout.ms' VS. 'max.block.ms' properties
Asked Answered
R

4

16

Given the below synchronous kafka producer

Properties props = new Properties();
props.put("max.block.ms", 30000);
props.put("request.timeout.ms", 30000);
props.put("retries", 5);

KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);

//Send message
producer.send(producerRecord).get();

help me understand the difference between request.timeout.ms and max.block.ms producer configs. Does either include max time for all retries? Or does each retry have its own timeout?

Redcap answered 24/1, 2017 at 18:11 Comment(0)
R
11

request.timeout.ms is used to timeout request, I would set this to the maximum time I can wait for the response.

max.block.ms is used for producer to block buffer time, serialization time etc.

For details look at this one. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient

Rawdan answered 24/1, 2017 at 21:19 Comment(1)
request.timeout.ms(producer config) should be larger than replica.lag.time.max.ms(broker config) to reduce the possibility of message duplication due to producer retries. SourcePhidippides
S
13

I found the accepted answer a bit "thin", so may be this will help others.

There are two important things in your code:

KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);

//Send message
producer.send(producerRecord).get();

producer.send(producerRecord) - that is made from two parts : blocking and non-blocking. The blocking part is made from some "parts":

- Request metadata about brokers from zookeeper 
- Serialize message
- Choose a Partition 
- Place Message in the RecordAccumulator

Now, usually, the first three steps are fast (first one is cache after the initial call), while the fourth one might take time. It happens because RecordAccumulator has limited space (buffer.memory) and how much space you have there at the moment depends on the other thread in the producer client (called Sender Thread). If this thread is doing a bad job (retrieving and sending messages from RecordAccumulator to the brokers; there are metrics for all of this btw), your send method will be blocked (there is no space in the RecordAccumulator) until space becomes available.

All of these 4 steps are allowed to be blocked to a max of max.block.ms. And this is what the KIP means when it talks about:

  • metadata fetch time (fetches metadata from zookeeper about brokers)
  • buffer full block time (the time I was saying about)
  • serialization time (customized serializer)
  • partitioning time (customized partitioner)

There is also the delivery.timeout.ms. It's the total time to wait before the message is sent to the partition and it includes: time to push the record to a batch (in the RecordAccumulator) + time to get acks (for example all and wait for the message to be replicated across replicas) + time to send the message to the broker, including all retries, if any.

You can think about it as the time it takes from send method down to when it reaches all replicas and acks is sent back. All this time has to be lower than delivery.timeout.ms.


Before explaining request.timeout.ms, imho it's important to understand what max.in.flight.requests.per.connection is, because they are a little connected. Suppose that a batch is ready to be sent to the broker from the RecordAccumulator (either because its batch.size or linger.ms was fulfilled). If this batch is taken by the so-called "Sender Thread" (which is a thread in the client itself and is != the thread that calls send method) or not is defined by max.in.flight.requests.per.connection.

You can have as many as max.in.flight.requests.per.connection concurrent requests being active at any point in time. A slightly easier way to think about it is this. A "Sender Thread" has a certain loop that it constantly does, in pseudo-code:

while(true) {
    // check if there are batches to send
    // get the batches to send to the brokers
    // make requests to the broker
    // poll connections
    // etc
}

Suppose that this is the first batch to send. "Sender Thread" increments max.in.flight.requests.per.connection so that it becomes 1; gets the batches and sends it to the broker. It does not wait for an acknowledgment at this point in time, but goes back to its loop. So on, until it reaches 5 (default value for max.in.flight.requests.per.connection).

Now suppose there is batch to send to the broker, Sender Thread is not going to take since it has no available requests (we are at max 5 now). Instead it will "Poll Connections": it will ask the broker for the result of the previously sent and the rest of the explanation is here


With all this background it's time to look at request.timeout.ms, which is actually pretty easy now. When the client polls for connections - tries to get a response from the broker for each in flight request, it can do that within the request.timeout.ms (which by default is 30 seconds). This value is reset if we retry.

Spoilt answered 29/9, 2022 at 20:23 Comment(0)
R
11

request.timeout.ms is used to timeout request, I would set this to the maximum time I can wait for the response.

max.block.ms is used for producer to block buffer time, serialization time etc.

For details look at this one. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient

Rawdan answered 24/1, 2017 at 21:19 Comment(1)
request.timeout.ms(producer config) should be larger than replica.lag.time.max.ms(broker config) to reduce the possibility of message duplication due to producer retries. SourcePhidippides
A
1

So where is a place for retries and retry.backoff.ms?

When i tried to send request to broker/server that is turned off (max.block.size = 60000, request.timeout.ms = 1000, delivery.timeout.ms = 2000, retries= 2, retry.backoff.ms = 100)

   try {
        kafkaTemplate.send(producerRecord).get();
    }catch (Exception e){
        LOGGER.error("Error while sending request: {}", e);
    }

I get timeout after 60s -> max.block.ms

Exception thrown: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic DUMMY-TOPIC not present in metadata after 60000 ms.

Does any retry is happening under the hood ?

Does timeout should appear earlier? it Should take less then 7s -> delivery.timeout 2s for 3 times and backoff time is just 100 ms

Even if i set kafkaTemplate.send(producerRecord).get(1000,TimeUnit.MILLISECONDS) It still waits to 60s -> max.block.ms till this method throws exception. Why the timeout exception was not for 1s.

Full props of producer:

    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9093]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = DummyClient-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 2000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 50
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 6000000
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 1000
    retries = 2
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
Arawak answered 20/11, 2023 at 21:41 Comment(0)
S
0

max.block.ms is also used to wait when fetching metadata.

Soap answered 13/12, 2023 at 10:40 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.