Kafka Rebalancing and listeners pitfalls
Asked Answered
W

1

15

I am reading Kafka: The Definitive Guide and would like to better understand the rebalance listener. The example in the book simple uses a HashMap to maintain the current offsets that have been processed and will commit the current state when a partition is revoked. My concerns are:

There are two issues/questions I have around the code example:

  1. The language used leads me to assume that these callbacks are made on a different thread. So, shouldn't thread safety be considered when applying the current offsets? Additionally, shouldn't the current batch be cancelled after this is committed?
  2. It says to use commitSync to make sure offsets are committed before the rebalance proceeds. However this is only synchronous within that consumer. Is there some mechanism where the coordinator will not proceed until it hears back from all subscribed consumers?
Wingspread answered 11/4, 2018 at 17:58 Comment(0)
Y
12
  1. I re-read the section in the book and I agree I was a bit confused too!

    The Javadoc states:

    This callback will only execute in the user thread as part of the poll(long) call whenever partition assignment changes.

    I had a look at the code and confirmed the rebalance listener methods are indeed called in the same thread that owns the Consumer.

  2. Yes you should use commitSync() when committing in the rebalance listener.

    To explain why, let's look at the golden path example. We start with a consumer happily consuming and heartbeating regularly to the coordinator. At some point the coordinator returns a REBALANCE_IN_PROGRESS error to a heartbeat request. This can be caused by a new member wanting to join the group, a member leaving or failing to heartbeat, or new partition being added/removed from the subscription. At this point, all consumers need to rejoin the group.

    Before attempting to rejoin the group, the consumer will synchronously execute ConsumerRebalanceListener.onPartitionsRevoked(). Once the listener returns, the consumer will send a JoinRequest to the coordinator to rejoin the group.

    That said, and I think this is what you were thinking about, if your callback takes too long (> session.timeout.ms) to commit, the group could be already be in another generation and the partitions with offset trying to be committed assigned to another member. In that case, the commit will fail even if it was synchronous. But by using commitSync() in the listener you are guaranteed the consumer won't rejoin the group before completing the commit.

Yare answered 20/4, 2018 at 15:19 Comment(4)
So, for full clarification a commitSync that occurs after the cluster has already repartitioned will result in the broker rejecting it. That would be a non-retriable event and the consumer can handle it as you deem fit?Wingspread
In that case, the commit will fail (the commit callback is called with the failure) and the consumer will just attempt to rejoin the group. The member of the group that is assigned the partitions that couldn't be committed will restart from the last successfully committed offsetYare
@MickaelMaison Does your first point in the answer mean , the consumer rebalance listener will only get executed , when poll is called again , because in that scenario , the example in the book makes no sense , because the offsets will be committed already while processing of records and there is no need to commit them again in the rebalance listener.Zo
@MickaelMaison the example in the book , only makes sense , if the heartbeat thread which runs in the background , has a way of interrupting the processing thread and getting the control back up to poll() like a goto would .Zo

© 2022 - 2024 — McMap. All rights reserved.