Apache Flink - Partitioning the stream equally as the input Kafka topic
Asked Answered
A

1

5

I would like to implement in Apache Flink the following scenario:

Scenario

Given a Kafka topic having 4 partitions, I would like to process the intra-partition data independently in Flink using different logics, depending on the event's type.

In particular, suppose the input Kafka topic contains the events depicted in the previous images. Each event have a different structure: partition 1 has the field "a" as key, partition 2 has the field "b" as key, etc. In Flink I would like to apply different business logics depending on the events, so I thought I should split the stream in some way. To achieve what's described in the picture, I thought to do something like that using just one consumer (I don't see why I should use more):

FlinkKafkaConsumer<..> consumer = ...
DataStream<..> stream = flinkEnv.addSource(consumer);

stream.keyBy("a").map(new AEventMapper()).addSink(...);
stream.keyBy("b").map(new BEventMapper()).addSink(...);
stream.keyBy("c").map(new CEventMapper()).addSink(...);
stream.keyBy("d").map(new DEventMapper()).addSink(...);

(a) Is it correct? Also, if I would like to process each Flink partition in parallel, since I'm just interested to process in-order the events sorted by the same Kafka partition, and not considering them globally, (b) how can I do? I know the existence of the method setParallelism(), but I don't know where to apply it in this scenario.

I'm looking for an answer about questions marked (a) and (b). Thank you in advance.

Ancipital answered 3/10, 2020 at 13:32 Comment(5)
It would help to understand what the pipeline is doing. Why is the purpose of the keyBys, and what happens in the event mappers?Jovanjove
You don’t have to do things this way, but it’s common to put messages of different types, that require different business logic, into different topics. Partitions are more often used for splitting up a topic by key, and for parallel processing.Jovanjove
Hi David, thank you for the answer. The topic has four different kind of messages on the same topic as a constraint, I’m not in control of it, I can just consume its events; I agree with you anyways, I would also make four different topics in that case. Each type of event must be processed in different ways. For example, “a”’s events must satisfy some constrsints that are different from “b”’s events, so I would like to partition my stream as the input Kafka topic, since each type of element must be processed differently.Ancipital
In the example each event can be identified by a specific key field, so I thought I could achive the wanted partitioning applying a keyBy, but maybe is the wrong way.Ancipital
I would like to specify one more time that, if it’s possible, I’m looking for process the four different types of events in a parallel fashion.Ancipital
J
8

If you can build it like this, it will perform better:

enter image description here

Specifically, what I'm proposing is

  1. Set the parallelism of the entire job to exactly match the number of Kafka partitions. Then each FlinkKafkaConsumer instance will read from exactly one partition.

  2. If possible, avoid using keyBy, and avoid changing the parallelism. Then the source, map, and sink will all be chained together (this is called operator chaining), and no serialization/deserialization and no networking will be needed (within Flink). Not only will this perform well, but you can also take advantage of fine-grained recovery (streaming jobs that are embarrassingly parallel can recover one failed task without interrupting the others).

  3. You can write a general purpose EventMapper that checks to see what type of event is being processed, and then does whatever is appropriate. Or you can try to be clever and implement a RichMapFunction that in its open() figures out which partition is being handled, and loads the appropriate mapper.

Jovanjove answered 4/10, 2020 at 13:39 Comment(13)
Thank you very much David, crystal clear as always.Ancipital
Hi David, suppose the data are already distributed as the diagram, Flink still creates per-partition watermarks? In this case, these watermarks are independent? I mean EventMapper has no multiple upstream input channels, so there is no action of "calculating the minimum of the incoming watermarks"Ushas
When watermarking is done by the FlinkKafkaConsumer, then it does per-partition watermarking. Of course, if each consumer instance has only one partition, then the distinction is moot.Jovanjove
Hi David. With this solution, are we always guaranteed to get the same data from each partition in the same mapper function, especially in the event of a failure? I have a similar problem to this where I want to process and maintain state based on a key present on each message. Thankfully, each partition is segregated by this key.Mute
Yes, this will be the case so long as the parallelism remains unchanged.Jovanjove
@DavidAnderson how to make Flink read data from n partitions with k*n workers in parallel? Spark has an option for this, called minPartitions, Flink I think also has KafkaPartitionSplit for this, but how do you use it?Warnock
@pavel_orekhov AFAIK, this is not possible.Jovanjove
@DavidAnderson I see, thanks for your answer, I hope Flink gets this eventually, maybe it will improve the speed in certain situations.Warnock
I can see this being beneficial for batch processing, but for streaming I think it would be too much hassle to support and would suggest instead that users re-partition their topic(s).Jovanjove
You're saying this, because usually in streaming applications the lag between Flink and Kafka end offsets is small and there's really no need to divvy up small chunks (10 - 50 elements) into even smaller chunks?Warnock
I don't think the additional complexity would add any value.Jovanjove
And now that I think about it, for batch processing I think Flink can already support this (though not very elegantly). E.g., you could define several different sources that read different offset ranges.Jovanjove
@DavidAnderson you're right, I agree with you, thank you for your opinion.Warnock

© 2022 - 2025 — McMap. All rights reserved.