Kafka producer TimeoutException: Expiring 1 record(s)
Asked Answered
M

4

35

I am using Kafka with Spring-boot:

Kafka Producer class:

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        });
    }
}

Kafka-configuration:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093

Let's say I have sent 10 times 1000 messages in topic my-test-topic.

8 out of 10 times I am successfully getting all messages in my consumer but sometimes I am getting this below error:

2017-10-05 07:24:11, [ERROR] [my-service - LoggingProducerListener - onError:76] Exception thrown when sending a message with key='null' and payload='{"deviceType":"X","deviceKeys":[{"apiKey":"X-X-o"}],"devices...' to topic my-test-topic

and org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-test-topic-4 due to 30024 ms has passed since batch creation plus linger time

Meteorite answered 9/10, 2017 at 15:15 Comment(6)
Is this error you are describing from the producer or the consumer?Zoroastrianism
I am getting this error on producerMeteorite
So, your batch is too slow for such a "low" request.timeout.ms. Try to make batch-size a bit lowerDorri
Isn't 30 second enough?( I am new to Kafka, please bear with me)Meteorite
I don't know, but according your error you are really exceeding those 30 secs: due to 30024 ms has passedDorri
any news here? I've got the same issue with spring cloud stream and kafka binderBarman
A
20

There are 3 possibilities:

  1. Increase request.timeout.ms - this is the time that Kafka will wait for whole batch to be ready in buffer. So in your case if there are less than 100 000 messages in buffer, timeout will occur. More info here: https://mcmap.net/q/353384/-when-does-the-apache-kafka-client-throw-a-quot-batch-expired-quot-exception
  2. Decrease batch-size - related to previous point, it will send batches more often but they will include fewer messages.
  3. Depending on message size, maybe your network cannot catch up with high load? Check if your throughput is not a bottleneck.
Alvera answered 30/12, 2017 at 12:49 Comment(4)
I have the same issue as OP ever since I enabled SSL on Kafka, and notice that, like me, he has set linger.ms. According to the documentation, batches are sent out after this linger time even if the batch is not full, so even with a high batch size it should not time out.Epigram
@Alvera After reading and understanding these two articles: 1) https://mcmap.net/q/353384/-when-does-the-apache-kafka-client-throw-a-quot-batch-expired-quot-exception and 2) cloudera.com/documentation/kafka/latest/topics/… . I felt that we should increase batch-size to avoid timeout. If we increase batch-size -> Number of batches will be reduced -> Number of requests will decrease -> Time taken to send records will decrease -> Timeout will not occur frequentlyDrayton
request.timeout.ms tells how much time the batch can remain in the buffer before it times out. It is not the time Kafka waits for the buffer to be filled.Breathe
I'm seeing this issue during times of low volumes, during which the 3 possibilities do not apply.Balcom
P
2
  1. First clue in the error is 30024 ms has passed - the configuration spring.kafka.producer.request.timeout.ms=30000 is related. This 30 seconds wait is for filling up the buffer on the Producer side.

  2. When a message is published, it gets buffered at the Producer's side and it will wait for 30s (see config above) to fill up. spring.kafka.producer.batch-size=100000 means 100KB so if the message ingestion load is low, and the buffer doesn't fill up with more messages to 100KB in 30s, you would expect this message.

  3. spring.kafka.producer.linger.ms=10 is used where the ingestion load is high and the producer wants to limit send() calls to Kafka brokers. This is the duration Producer will wait before sending messages to the broker after batch is ready (i.e. after buffer is filled up to batch size of 100KB).

Solution:

  • Increase linger.ms to hold messages longer after the batch is ready. If more time is needed to fill the batch, increase request.timeout.ms.
  • Another approach: reduce batch-size, or increase request.timeout.ms, or both.
Phalan answered 4/1, 2021 at 13:39 Comment(1)
1. Completely wrong, see documentation: "The configuration controls the maximum amount of time the client will wait for the response of a request". 2. Completely wrong, see above. 3. Completely wrong, see documentation: "once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up" docs.confluent.io/platform/current/installation/configuration/…Footstall
Q
2

In my case, I had replication factor < min.insync.replicas. A write could never get enough acknowledgments and therefore timed out. Recreating the topic with replication factor > min.insync.replicas fixed it.

Quirita answered 17/6, 2022 at 6:15 Comment(0)
G
-1

I resolved this issue by addressing correctly the host spring.kafka.bootstrap-servers with its DNS. Even if the network resolves the IP addresses, it seems to need the DNS.

Gompers answered 15/9, 2021 at 8:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.