We have 10 consumers in a group listening for a topic. What is happening very often is to see the consumers being rebalanced very often (which completely stops the consumer process for some time).
# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --describe --group ParserKafkaPipeline | grep -e ParserBody | sort
ParserBodyToParse 0 99 99 0 consumer-1-f29b7eb7-b871-477c-af52-446fbf4b0496 / consumer-1
ParserBodyToParse 1 97 97 0 consumer-10-6639ee02-8e68-40e6-aca1-eabd89bf828e / consumer-10
ParserBodyToParse 2 97 97 0 consumer-11-c712db8b-0396-4388-9e3a-e8e342355547 / consumer-11
ParserBodyToParse 3 97 98 1 consumer-12-0cc6fe12-d640-4344-91c0-f15e63c20cca / consumer-12
ParserBodyToParse 4 97 98 1 consumer-13-b904a958-141d-412e-83ea-950cd51e25e0 / consumer-13
ParserBodyToParse 5 97 98 1 consumer-14-7c70ba88-8b8c-4fad-b15b-cf7692a4b9ce / consumer-14
ParserBodyToParse 6 98 98 0 consumer-15-f0983c3d-8704-4127-808d-ec8b6b847008 / consumer-15
ParserBodyToParse 7 97 97 0 consumer-18-de5d20dd-217c-4db2-9b39-e2fdbca386e9 / consumer-18
ParserBodyToParse 8 98 98 0 consumer-5-bdeaf30a-d2bf-4aec-86ea-9c35a7acfe21 / consumer-5
ParserBodyToParse 9 98 98 0 consumer-9-4de1bf17-9474-4bd4-ae61-4ab254f52863 / consumer-9
# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --describe --group ParserKafkaPipeline | grep -e ParserBody | sort
Warning: Consumer group 'ParserKafkaPipeline' is rebalancing.
ParserBodyToParse 0 99 99 0 - - -
ParserBodyToParse 1 99 99 0 - - -
ParserBodyToParse 2 99 99 0 - - -
ParserBodyToParse 3 99 100 1 - - -
ParserBodyToParse 4 99 100 1 - - -
ParserBodyToParse 5 99 100 1 - - -
ParserBodyToParse 6 100 100 0 - - -
ParserBodyToParse 7 99 99 0 - - -
ParserBodyToParse 8 100 100 0 - - -
ParserBodyToParse 9 100 100 0 - - -
Notice the warning in the second call above.
Consuming these messages might take a long time, but it shouldn't take more than two minutes. I checked that the limit on consumer.poll
is 5 minutes, which shouldn't be an issue. Are there some logs to check what exactly is happening?
We use Kafka 2.2.1 and Java consumer. We didn't change the default value of max.session
and max.heartbeat
. The consumer is basically waiting for IO from other service, so it is not using any CPU – that is why I expect the heartbeat should be working correctly.
Our consumer code is following:
inline fun <reified T : Any> consume(
topic: KafkaTopic,
groupId: String,
batchSize: Int = 50,
crossinline consume: (key: String?, value: T) -> (Unit)
) = thread {
val consumerProperties = Properties()
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize)
val consumer = KafkaConsumer<String?, ByteArray>(consumerProperties)
while (true) try {
val records = consumer.poll(Duration.ofMinutes(pollDurationMinutes))
log.debug("Topic $topic consumed by group $groupId: ${records.count()} records.")
records.forEach { record -> consumeRecord(record, topic, consume) }
} catch (e: Exception) {
log.fatal("Couldn't consume records: ${e.message}.", e)
// sleep to prevent logging hell when connection failure
What is the behavior when you reduce the no of consumers in a group?
– I was changing the numbers up and down, but with no difference. – Ambrizbut it shouldn't take more than two minutes
Sometimes it will. You should check the consumer log for heart beat expire log and poll interval log if any – Foxtail