How to achieve high availability in a Kafka Streams app during deployment?
Asked Answered
P

2

12

Main question: we run Kafka Streams (Java) apps on Kubernetes to consume, process and produce real time data in our Kafka Cluster (running Confluent Community Edition v7.0/Kafka v3.0). How can we do a deployment of our apps in a way that limits downtime on consuming records? Our initial target was approx 2 sec downtime a single time for each task.

We are aiming to do continuous deployments of changes to the production environment, but deployments are too disruptive by causing downtime in record consumption in our apps, leading to latency in produced real time records.

We have tried different strategies to see how it affects latency (downtime).

Strategy #1:

  • Terminate all app instances (total of 6)
  • Immediately start all new app instances
  • Result: measured max latency for consumed records: 85 sec

Strategy #2:

  • Start one new app instance
  • Wait 3 minutes to allow restoring the local state in the new app instance
  • After 3 minutes terminate one old app instance
  • Repeat until all old app instances are terminated
  • Result: measured max latency for consumed records: 39 sec

Strategy #3:

  • Same as Strategy #2, but increase wait time to 15 minutes
  • Result: measured max latency for consumed records: 7 sec. However 15 minutes per app instance will lead to 15 min x 6 instance = 90 minutes to deploy a change + additional 30 minutes for the incremental rebalancing protocol to finish. We find the deploy time to be quite excessive.

We have been reading the KIP-429: Kafka Consumer Incremental Rebalance Protocol and tried to configure the app to support our use case.

Following are the key Kafka Streams configuration we did for Strategy #2 and #3:

acceptable.recovery.lag: 6000
num.standby.replicas: 2
max.warmup.replicas: 6
probing.rebalance.interval.ms: 60000
num.stream.threads: 2

Input topic has 12 partitions and message rate of 800 records/s in average. There are 3 Kafka Streams Key Value State Stores, where two have the same rate as input topic. These two are roughly of 20GB size. Size of key set is about 4000. In theory acceptable.recovery.lag above should be ~60 sec latency on changelog topic per partition.

Here are some metrics (up, rate messages received and latency of messages received) per app instance for Strategy #3: enter image description here

Notable observations we did:

1a - first new app instance starts

1b - Kafka immediately rebalance, 2 old app instances gets assigned more tasks and 2 old looses tasks

1c - at the same time max latency recorded increases from 0.2 sec to 3.5 sec (this indicates that rebalance takes approx 3 sec)

2 - probing rebalance occurs and Kafka Streams somehow decides to revoke a task from one of the old app instances and give it to one that already has the most tasks

3 - last old app instance is terminated

4 - all partitions are rebalanced as prior to the upgrade, the incremental rebalance is completed (approx 33 minutes after last app instance terminated)

Other - it takes approx 40 minutes for the first new app instance to get assigned a task. In addition every task is re-assigned to multiple times, causing many small disruptions of 3 sec.

We can provide much more details about the topology, topics, configuration and metrics diagrams for the other strategies, if needed (this thread is already huge).

Parsnip answered 22/2, 2022 at 13:51 Comment(2)
Did you find a good solution to your availability problem? We are currently struggling with the same problem :/Adz
We did not find a very good solution. We have switched to focus on low startup-time. Fex. via statefulSets in kubernetes to restore state from disk or just reducing the amount of saved state and using in-memory state stores.Somatoplasm
S
2

I'm unable to comment (due to being relatively new to SO), so I'll reply here. Your point about StatefulSet's and rolling restarts is correct, but you can fix that by using podManagementPolicy: Parallel in your StatefulSet. This makes it the same as the Deployment in that all pods will come up at the same time.

https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#pod-management-policies

Susceptive answered 30/8, 2022 at 3:13 Comment(0)
C
1

Some advice general advice for your Kafka Stream app (not a pro but something I observe personally).

  1. Change your pod from Deployment to StatefulSet and add this configuration group.instance.id: "${hostname}". By this way, the pod will keep the same name of pod and you will be able to use Kafka Consumer Incremental Rebalance Protocol
  2. While now you are using a StatefulSet, persist the state store in a permanent storage, it will remove the state store reload from Kafka. But take care that you need to properly shutdown the Kafka Stream based on signal received by Kubernetes and tune the terminationGracePeriodSeconds to allow proper shutdown. If not, Kafka Stream will detect the checkpoint is not clean and will refetch the states store (not really clear if your state store are persisted but I think it's already done).

The only thing I have observed on my side is a simple consumer group rebalancing is longer that your 2 seconds expected and it's seems to be complicated goal (like you said a rebalanced take 3 sec). But using incremental rebalance, you will have some partition that will be processed during the rebalance I think (not measured this personally for now).

Cholecystitis answered 13/3, 2022 at 20:15 Comment(1)
We have considered switching to StatefulSet and it may very well have to be the way to go. However, this approach would require us to terminate one replica instance before starting the other. This would not take us to our initial target that is approx 2 sec downtime a single time for each task. And we would always be limited by the time for Kubernetes to start our JVM pod. We think this is not in the spirit of what Kafka team are trying to achieve in KIP-429. And yes, we must accept 3 sec delay for rebalancing and thats okParsnip

© 2022 - 2025 — McMap. All rights reserved.