Kafka keeps rebalancing consumers
Asked Answered
I

3

17

We have 10 consumers in a group listening for a topic. What is happening very often is to see the consumers being rebalanced very often (which completely stops the consumer process for some time).

# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092  --describe  --group ParserKafkaPipeline | grep -e ParserBody | sort
ParserBodyToParse 0          99              99              0               consumer-1-f29b7eb7-b871-477c-af52-446fbf4b0496  /10.12.18.58    consumer-1
ParserBodyToParse 1          97              97              0               consumer-10-6639ee02-8e68-40e6-aca1-eabd89bf828e /10.12.18.58    consumer-10
ParserBodyToParse 2          97              97              0               consumer-11-c712db8b-0396-4388-9e3a-e8e342355547 /10.12.18.58    consumer-11
ParserBodyToParse 3          97              98              1               consumer-12-0cc6fe12-d640-4344-91c0-f15e63c20cca /10.12.18.58    consumer-12
ParserBodyToParse 4          97              98              1               consumer-13-b904a958-141d-412e-83ea-950cd51e25e0 /10.12.18.58    consumer-13
ParserBodyToParse 5          97              98              1               consumer-14-7c70ba88-8b8c-4fad-b15b-cf7692a4b9ce /10.12.18.58    consumer-14
ParserBodyToParse 6          98              98              0               consumer-15-f0983c3d-8704-4127-808d-ec8b6b847008 /10.12.18.58    consumer-15
ParserBodyToParse 7          97              97              0               consumer-18-de5d20dd-217c-4db2-9b39-e2fdbca386e9 /10.12.18.58    consumer-18
ParserBodyToParse 8          98              98              0               consumer-5-bdeaf30a-d2bf-4aec-86ea-9c35a7acfe21  /10.12.18.58    consumer-5
ParserBodyToParse 9          98              98              0               consumer-9-4de1bf17-9474-4bd4-ae61-4ab254f52863  /10.12.18.58    consumer-9

# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092  --describe  --group ParserKafkaPipeline | grep -e ParserBody | sort
Warning: Consumer group 'ParserKafkaPipeline' is rebalancing.
ParserBodyToParse 0          99              99              0               -               -               -
ParserBodyToParse 1          99              99              0               -               -               -
ParserBodyToParse 2          99              99              0               -               -               -
ParserBodyToParse 3          99              100             1               -               -               -
ParserBodyToParse 4          99              100             1               -               -               -
ParserBodyToParse 5          99              100             1               -               -               -
ParserBodyToParse 6          100             100             0               -               -               -
ParserBodyToParse 7          99              99              0               -               -               -
ParserBodyToParse 8          100             100             0               -               -               -
ParserBodyToParse 9          100             100             0               -               -               -

Notice the warning in the second call above.

Consuming these messages might take a long time, but it shouldn't take more than two minutes. I checked that the limit on consumer.poll is 5 minutes, which shouldn't be an issue. Are there some logs to check what exactly is happening?

UPDATE:

We use Kafka 2.2.1 and Java consumer. We didn't change the default value of max.session and max.heartbeat. The consumer is basically waiting for IO from other service, so it is not using any CPU – that is why I expect the heartbeat should be working correctly.

Our consumer code is following:

    inline fun <reified T : Any> consume(
            topic: KafkaTopic,
            groupId: String,
            batchSize: Int = 50,
            crossinline consume: (key: String?, value: T) -> (Unit)
    ) = thread {
        val consumerProperties = Properties()
        consumerProperties.putAll(properties)
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize)

        val consumer = KafkaConsumer<String?, ByteArray>(consumerProperties)

        consumer.subscribe(listOf(topic.toString()))

        while (true) try {
            val records = consumer.poll(Duration.ofMinutes(pollDurationMinutes))
            log.debug("Topic $topic consumed by group $groupId: ${records.count()} records.")
            records.forEach { record -> consumeRecord(record, topic, consume) }
        } catch (e: Exception) {
            log.fatal("Couldn't consume records: ${e.message}.", e)
            // sleep to prevent logging hell when connection failure
            Thread.sleep(1000)
        }
    }
Insanitary answered 18/6, 2019 at 10:50 Comment(13)
Have you checked the group.max.session.timeout.ms , heartbeat.interval.ms ? More details at kafka.apache.org/documentation/#brokerconfigsProlix
Yes, but the heartbeat is running in its own thread so it shouldn't have any impact unless the consumer is really dead. Again: how can I validate what exactly is happening and why the consumers are rebalanced? There must be something to search for in logs.Ambriz
What are the values of group.max.session.timeout.ms , heartbeat.interval.ms ? Not sure if GC is coming into play and the Stop-The-World thing is taking long time which is more than MaxSessionTimeout? (Assuming you are using Java Consumer) What is the Kafka Version? What is the behavior when you reduce the no of consumers in a group?Prolix
I updated the question. What is the behavior when you reduce the no of consumers in a group? – I was changing the numbers up and down, but with no difference.Ambriz
Please read this (cwiki.apache.org/confluence/display/KAFKA/…). It seems like max.poll.interval.ms i.e. 5min has been elapsed since the last poll and it timed out. Please try to figure out the same.Prolix
Yes, I read this and I even mention this in my question. However I can confirm poll never takes more than 5 minutes. Again: Is there something in logs which should hint me what really happened?Ambriz
Can you post some code from your consumer?Riva
The logs that would tell you why consumers are rebalancing should be your consumer application logs. Can you post what those show? Or how is it exactly you're consuming from Kafka?Tract
but it shouldn't take more than two minutes Sometimes it will. You should check the consumer log for heart beat expire log and poll interval log if anyFoxtail
Unfortunately there is nothing in the log within my application. I don't see any heartbeats or failures. I will add my code to the original question.Ambriz
What happens once the while loop is exited, either gracefully or with a throwable ? Looks to me like the consumer will shut down once that happens, leading to a rebalanceDrumhead
@Vojtěch, I am facing the similar issue. We are using it on node.js. Did u solved this problem? If so, can u help what helped address the issueAscending
We abandoned Kafka and now using Google Pub/Sub. Works way better.Ambriz
F
11

Frequent rebalances are usually caused because it is taking too long for the consumer to process batches. This happens because the consumer is processing the batch for a long time (and heartbeats are not being sent) and therefore the brokers think that consumer was lost and they start re-balancing.

I would either suggest to create smaller batches by reducing the value of max.partition.fetch.bytes or extend/increase heartbeat intervals by increasing the value of heartbeat.interval.ms.

Facia answered 21/6, 2019 at 16:11 Comment(2)
Why wouldn't be the heartbeats sent? They are running in separate thread and as the CPU is not overloaded, they should work correctly. Also I am already reading batchsize = 1.Ambriz
Agree - heartbeat is unrelated. That is there so that a consumer application stopping is quickly detected. It is max.poll.interval (default 5 minutes) or the max.poll.records (default 500) that are most likely to help hereNeuron
N
7

I think that the first part of Giorgos answer is correct, up to ".....processing the batch for a long time" but the configuration advice is for a different problem.

There are two causes of a rebalance, too long between polls or too long between heartbeats. The logs should tell you which has caused rebalance, but it is usually the former.

If the problem is heartbeat then the advised configuration changes may help, and/or session.timeout.ms. The heartbeat runs in a separate thread and allows the group to quickly determine if a consumer application has died.

If the problem is too long between polls and you can't speed up your processing then you need to increase the allowed gap between calling poll, or reduce the number of records you handle on each poll. The relevant properties are max.poll.interval (default 5 minutes) or max.poll.records (default 500)

Neuron answered 27/3, 2020 at 13:9 Comment(0)
A
2

For anyone who encounters this even after they feel processing of records is not the bottleneck:

We recently encountered a nasty bug in Kafka Connect Runtime which will keep heartbeat threads and spin up more Kafka Connect Tasks with same thread name (Essentially, not killing older task threads and heartbeat threads)

Following bugs were encountered in version 2.3.1 and few other versions as mentioned in the JIRA.

https://issues.apache.org/jira/browse/KAFKA-9841

https://issues.apache.org/jira/browse/KAFKA-10574

https://issues.apache.org/jira/browse/KAFKA-9184

Also happened in Confluent Platform version 5.3.1, so please upgrade your kafka connect runtime and connect-api to latest versions if possible.

Austere answered 26/5, 2022 at 20:26 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.