How to fix kafka.common.errors.TimeoutException: Expiring 1 record(s) xxx ms has passed since batch creation plus linger time
Asked Answered
E

3

10

I am using kafka_2.11-2.1.1 and Producer using spring 2.1.0.RELEASE.

I am using spring while I am sending the messages to Kafka topic my producer generates a lot of TimeoutExceptions

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time

I am using below kafka producer settings

acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60

I tried many combinations (specially batchSize & lingerMs) but nothing worked. Any help please what should be the setting for above scenario.

Tried again with below configs ...but no luck same error

acks = 1
    batch.size = 15
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class com.spgmi.ca.prescore.partition.CompanyInfoPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120
    retries = 1

Second Time Run :

I treid different combinations nothing worked. Hence i thought it would be problem with network , SSL etc. So I installed and run the Kafka on the same machine where producer is running i.e. in my local computer.

I tried to run the producer again pointing to the local Kafka topics. But no luck same issue.

Below are configuration params used.

2019-07-02 05:55:36.663  INFO 9224 --- [lt-dispatcher-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 0
    bootstrap.servers = [localhost:9092]
    request.timeout.ms = 60
    retries = 1
    buffer.memory = 33554432
    linger.ms = 0
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null

Facing same error : org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic--1: 69 ms has passed since batch creation plus linger time

Also tried batch.size 5 , 10 & 0 linger_ms 0 , 5 , 10 etc. request_time_out 0 , 45, 60, 120 , 300 etc.

Nothing working ...same error.

What else should I try , what could be the solution ?

How to avoid negative key generation

Yeah I set up local set up and print the log with partition info which shows as below

2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] c.s.c.p.p.CompanyInfoPartitioner : Topic : inbound_topic Key = 597736248- Entropy Cayman Solar Ltd.-null-null-null Partition = -1 2019-07-03 02:48:28.931 ERROR 7092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='597736248- Entropy Cayman Solar Ltd.-null-null-null' and payload='com.spgmi.ca.prescore.model.Company@8b12343' to topic inbound_topic :

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic --1: 104 ms has passed since batch creation plus linger time

My topics inbound_topic has two partitions as you see below C:\Software\kafka\kafka_2.11-2.1.1\bin\windows>kafka-topics.bat --describe --zookeeper localhost:2181 --topic inbound_topic Topic:inbound_topic PartitionCount:2 ReplicationFactor:1 Configs: Topic: inbound_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: inbound_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0

But my producer seems to trying to send to Partition = -1.

My partition logic is as below

int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions;
        logger.info("Topic : "+ topic + "\t Key = " + (String)key + " Partition = " + p );

On key i am doing hashCode(). What need to be corrected here to avoid this negative number partition number ? i.e. Partition = -1

What should be my partition key logic like ?

anyhelp highly appriciated.

Exactitude answered 28/6, 2019 at 12:43 Comment(0)
E
13

The error indicates that some records are put into the queue at a faster rate than they can be sent from the client.

When your Producer sends messages, they are stored in buffer (before sending them to the target broker) and the records are grouped together into batches in order to increase throughput. When a new record is added to the batch, it must be sent within a -configurable- time window which is controlled by request.timeout.ms (the default is set to 30 seconds). If the batch is in the queue for longer time, a TimeoutException is thrown and the batch records will then be removed from the queue and won't be delivered to the broker.

Increasing the value of request.timeout.ms should do the trick for you.


In case this does not work, you can also try decreasing batch.size so that batches are sent more often (but this time will include fewer messages) and make sure that linger.ms is set to 0 (which is the default value).

Note that you need to restart your kafka brokers after changing any configuration parameter.

If you still get the error I assume that something wrong is going on with your network. Have you enabled SSL?

Efta answered 28/6, 2019 at 14:14 Comment(4)
thank you so much for your detailed explanation. I tried earlier with request.timeout.ms 120 and increased request.timeout.ms 120 but no use. Now I tried with linger.ms = 0 as you suggested but still getting same error ... fyi I updated the question.Exactitude
Why should we restart your kafka brokers after changing any configuration parameters , there parameters are set out at Producer config , how changing them will impact Brokers configurations to restart them? really confused , can you plz throw more light ?Exactitude
@Shyam I was talking about broker level configuration parameters (i.e. those in server.properties).Efta
@giorgos-myrianthous Do any of the properties you mentioned above belong in the server.properties file, or do they all belong in the Producer configs?Jacklyn
E
1

I fixed the earlier issue using returning a valid partition number.

Exactitude answered 3/7, 2019 at 8:24 Comment(6)
Yes for anyone else with same problem. I was searching all over, many references to request.timeout.ms, batch.size, linger.ms, etc. did not pan out. The problem was negative partition number.Backwardation
can you please explainMohr
@Satya Pavan Sometimes i still get this issue...yet to figure out solution, i tweaked few things but not got it resolved.Exactitude
Have you tried updating advertised host listeners on kafka side ?Curious
@Satya Pavan tried , kafka cluster being used by others too ...they dont have any problem.Exactitude
You shouldn't even be using object hashcodes. The builtin partitioner uses Murmur2 hashes, which are consistent across different JVM distributionsAbstractionist
M
0

maybe you have to check "ProducerListener" ,Have you implemented this class? I have same question with you,you can optimization ProducerListener implemented ,Slow operation will affect your sending results; example:

@Override public void onSuccess(ProducerRecord<K,V> producerRecord,RecordMetadata recordMetadata){ // db: update table, you should remove it ,or usd memory operations }

Middle answered 19/10, 2023 at 6:6 Comment(1)
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Maneuver

© 2022 - 2025 — McMap. All rights reserved.