Understanding kafka streams partition assignor
Asked Answered
M

2

6

I have two topics, one with 3 partitions and one with 48.

Initially i used the default assignor but i got some problems when a consumer(pod in kubernetes) crashed.

What happened was that when the pod came up again it reassigned the partition from the topic with 3 partitions and 0 from the topic with 48.

The two pods that did not crash got assigned 16 and 32 partitions from the topic with 48 partitions.

I've fixed this by using a round robin partition assignor but now i don't feel confident in how the partitions is distributed since i'm using kstream-kstream joins and for that we need to guarantee that the consumers are assigned to the same partition for all the consumer e.g. C1:(t1:p0, t2:p0) C2(t1:p1, t2:p1) etc..

One thing i thought of was that i could rekey the events coming in so they will repartition and then i might be able to guarantee this?

Or maybe i don't understand how the default partitioning work.. im confused

Moonset answered 16/4, 2019 at 14:45 Comment(6)
Actually, Kafka Streams does not allow to use a custom partition assignor. Probably your custom partition assignor is ignored. Additionally, according to the Kafka Streams docs, The input topics of the join (left side and right side) must have the same number of partitions.Doty
Ok, so i thought of merging the streams instead and then do a stream.through(newTopic) Just to merge the streams of data to one topic with 3 partitions, then ill filter them to two streams and do the joins? What do you think about this idea? The other solution i was thinking of was to create a microservice that just forwards the messages to one topic and then filter them to two new kstreams and do the joins.. any better ideas? And also the partition assignor, there is no way to change that when it comes to kstreams?Moonset
It seems you can find an answer here: #18203486Mako
@dmvkl: The thread you linked to covers how to customize the partitioner on producer side and how the default partition assignor on consumer side works. However, here the question is about customizing the partition assignor.Doty
@kambo: I answered to your original question. For help with your Kafka Streams application post to the Kafka user mailing list (kafka.apache.org/contact) or ask a new question here.Doty
@kambo, Similar question #55611588Toddy
D
13

Kafka Streams does not allow to use a custom partition assignor. If you set one yourself, it will be overwritten with the StreamsPartitionAssignor [1]. This is needed to ensure that -- if possible -- partitions are re-assigned to the same consumers (a.k.a. stickiness) during rebalancing. Stickiness is important for Kafka Streams to be able to reuse states stores at consumer side as much as possible. If a partition is not reassigned to the same consumer, state stores used within this consumer need to be recreated from scratch after rebalancing.

[1] https://github.com/apache/kafka/blob/9bd0d6aa93b901be97adb53f290b262c7cf1f175/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L989

Doty answered 17/4, 2019 at 18:57 Comment(2)
Thanks again. How will the application and kafka behave when changing keys inside the application. What i've seen so far is that there is some repartitioning going on. And it logs it heavily under load which i dont want. But does this mean bad news?Moonset
Records are repartitioned between a key changing operation and a key-based operation. For example, a map that outputs a new key followed by a groupByKey. See groupByKey javadocsDoty
J
0

since 3.8. There is task.assignor.class property. https://github.com/apache/kafka/blob/3.8/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L825

Jasper answered 4/11 at 14:49 Comment(1)
Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.Assizes

© 2022 - 2024 — McMap. All rights reserved.