Kafka Consumer CommitFailedException
Asked Answered
G

4

15

I am working on a kafka consumer program. Recently we deployed it in PROD environment. There we faced an issue as follows:

[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - No. of records fetched: 1
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Discovered group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null)
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Attempt to heartbeat failed for since member id consumer-otm-opl-group-1-953dfa46-9ced-472f-b24f-36d78c6b940b is not valid.
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch start offset: 9329428
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch Processing Successful.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Failing OffsetCommit request since the consumer is not part of an active group
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:936)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1387)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1349)
    at com.cisco.kafka.consumer.RTRKafkaConsumer.main(RTRKafkaConsumer.java:72)

My understanding is by the time group coordinator is unavailable and re-discovered, the heartbeat interval (3 seconds as per documentation) expires and consumer is kicked out of the group. Is this correct?. If so what should be the work around for this?. If I'm wrong, please help me in understanding this issue and suggest any ideas you have to fix this issue. I can share the code if needed.

Goodtempered answered 30/3, 2020 at 5:59 Comment(0)
C
33

The Exception you are referring to

Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

gives a hint on what is happening and what can be done to solve the problem. In the code this Exception is described as

"This exception is raised when an offset commit with KafkaConsumer#commitSync() fails with an unrecoverable error. This can happen when a group rebalance completes before the commit could be successfully applied. In this case, the commit cannot generally be retried because some of the partitions may have already been assigned to another member in the group."

In my experience the thrown error message can be caused by different things although they are all related to the consumer not being assigned to the partition anymore:

  1. Creating more and more Consumers without closing them
  2. Timeout of poll
  3. Timeout of heartbeat
  4. Outdated Kerberos ticket

1. Opening more and more Consumers without closing them

A rebalance takes place if you add a consumer to an existing ConsumerGroup. Therefore, it is essential to close the consumer after usage or to always use the same instance instead of creating new KafkaConsumer object for every message/iteration.

2. Poll Timeout (as explained in the Error message):

[...] that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.

The configuration max.poll.interval.ms defaults to 300000ms or 5minutes. As your consumer is taking more than those 5 minutes, the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member (see Consumer Configuration).

Solution for Poll Timeout:

A possible solution is also given in the error message

You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

The consumer reads all messages again, because (as the error shows) it is not able to commit the offsets. That means, if you start the Consumer with the same group.id it think that it never read anything from that topic.

3. Timeout of Heartbeat

There are two main configuration in your KafkaConsumer that deal with the heartbeats: heartbeat.interval.ms and session.timeout.ms.

In a seperate background thread your KafkaConsumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned. If the rebalance is triggered your consumer can't commit anything from an "old assigned" partition as it is written in the description of the CommitFailedException: "This can happen when a group rebalance completes before the commit could be successfully applied."

Solution for Heartbeat Timeout:

Increase the settings heartbeat.interval.ms and session.timeout.ms while following the recommendation: " The heartbeat.interval.ms must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value."

Just keep in mind that changing these values always comes with a trade-off. You have either

  • more frequent rebalances but shorter reaction time to identify dead consumers or
  • less frequent rebalances and longer reaction time to identify dead consumers.

4. Outdated Kerberos ticket

On our production cluster we have seen the CommitFailedException just after the application was not able to renew the Kerberos ticket.

Commissariat answered 30/3, 2020 at 7:3 Comment(0)
F
4

We had similar issue and we addressed by reducing max.poll.records from default 500 and also reducing the heartbeat interval. If your message processing takes time and poll records are 500, chances are high for getting the CommitFailedException.

Fabaceous answered 24/9, 2020 at 9:54 Comment(0)
O
0

I'm using Kstream and getting the same exception. I have increased the max.poll.interval time and added heartbeat interval as per the recommendation from confluent doc which resolved my problem.Note that if your processing is taking more time then set max.poll record to 1 and give it a try.

https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/

Odom answered 29/11, 2022 at 6:27 Comment(0)
C
-2

I had similar issue, i was able to solve it by reducing max.poll.records from default 500 to lesser number like 100-200. If your message processing takes time and poll records are 500, chances are high for getting the CommitFailedException.

Cottonade answered 20/10, 2022 at 13:25 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.