Event-Time merge of two Kafka topics using Kafka Streams DSL
Asked Answered
S

2

5

I am looking for a way to merge two Kafka topics based on the event time.

for example, I have two topics with the following schema {event-key}:: {event-time-as-value}

topic I -  { {1 :: 12:00pm} {2 :: 12:10pm} {3 :: 14:50pm} {4 :: 15:00pm} }
topic II - { {1 :: 13:00pm} {2 :: 13:10pm} {3 :: 15:50pm} {4 :: 16:00pm} }

The expected output should look like this:

{ {1 :: 12:00pm} {2 :: 12:10pm} {1 :: 13:00pm} {2 :: 13:10pm} {3 :: 14:50pm} {4 :: 15:00pm} {3 :: 15:50pm} {4 :: 16:00pm} }

Is there a way to do it using Kafka Streams DSL?

A Note: There is a good chance that the original topics are not ordered by event-time, and it's ok. I would like the algorithm to always pick the earliest of the two events that are currently at the head of each topic (same as the way the merge two sorted arrays algorithm works)

Skvorak answered 4/9, 2019 at 11:45 Comment(0)
M
7

Kafka Streams (as of version 2.1.0) implements the exact algorithm you describe. Hence, a simple:

StreamsBuilder builder = new StreamsBuilder();
builder
    .stream(Arrays.asList("firstInputTopic", "secondInputTopic"))
    .to("outputTopidName");

should do what you want. Note that the program will merge data on a per-partition bases.

Also consider configuration max.task.idle.ms.

For more details read the corresponding KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

Additionally, you need to implement and configure a custom TimestampExtractor that gets the timestamp from the value.

Mariande answered 4/9, 2019 at 15:4 Comment(0)
H
0

You can get the stream on multiple topics like below code:

val kafkaParams = Map(
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-streaming-notes",
  "auto.offset.reset" -> "earliest")

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferBrokers,
  ConsumerStrategies.Subscribe[String, String](Set("topic I", "topic I"), kafkaParams))
Hirundine answered 15/6, 2023 at 9:29 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.