Apache Kafka order windowed messages based on their value
Asked Answered
M

2

10

I'm trying to find a way to re-order messages within a topic partition and send ordered messages to a new topic.

I have Kafka publisher that sends String messages of the following format: {system_timestamp}-{event_name}?{parameters}

for example:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

Also, we add some message key for each message, to send them to the corresponding partition.

What I want to do is reorder events based on {system-timestamp} part of the message and within a 1-minute window, cause our publishers doesn't guarantee that messages will be sent in accordance with {system-timestamp} value.

For example, we can deliver to the topic, a message with a bigger {system-timestamp} value first.

I've investigated Kafka Stream API and found some examples regarding messages windowing and aggregation:

Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "",  // initial value
                (aggKey, value, aggregate) -> aggregate + "",   // aggregating value
                TimeWindows.of(1000), // intervals in milliseconds
                Serdes.String(), // serde for aggregated value
                "test-store"
        );*/

But what should I do next with this grouped stream? I don't see any 'sort() (e1,e2) -> e1.compareTo(e2)' methods available, also windows could be applied to methods like aggregation(), reduce() ,count() , but I think that I don't need any messages data manipulations.

How can I re-order message in the 1-minute window and send them to another topic?

Minded answered 12/5, 2017 at 13:55 Comment(0)
O
11

Here's an outline:

Create a Processor implementation that:

  • in process() method, for each message:

    • reads the timestamp from the message value
    • inserts into a KeyValueStore using (timestamp, message-key) pair as the key and the message-value as the value. NB this also provides de-duplication. You'll need to provide a custom Serde to serialize the key so that the timestamp comes first, byte-wise, so that ranged queries are ordered by timestamp first.
  • in the punctuate() method:

    • reads the store using a ranged fetch from 0 to timestamp - 60'000 (=1 minute)
    • sends the fetched messages in order using context.forward() and deletes them from the store

The problem with this approach is that punctuate() is not triggered if no new msgs arrive to advance the "stream time". If this is a risk in your case, you can create an external scheduler that sends periodic "tick" messages to each(!) partition of your topic, that your processor should just ignore, but they'll cause punctuate to trigger in the absence of "real" msgs. KIP-138 will address this limitation by adding explicit support for system-time punctuation: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics

Osman answered 3/6, 2017 at 15:10 Comment(3)
Having thought about it some more, this won't work well when the producer emits multiple messages with the same key within the same millisecond. So, the key should instead be (timestamp, some-unique-key) pair or the value should be a collection.Osman
Assuming that the producer already provides the correct order of the messages, this should be no problem since the messages are consumed in the same order as they have been produced as soon as they target the same partition - which we know by having the same keysSelfhood
Sketched up an example following this great instruction. github.com/confluentinc/kafka-streams-examples/issues/179Conchitaconchobar
O
0

Here is how I ordered streams in my project.

  1. Created topology with source, processor, sink.
  2. In Processor
    1. process(key, value) -> Added each record to List(instance variable).
    2. Init() -> schedule(WINDOW_BUFFER_TIME, WALL_CLOCK_TIME) -> punctuate (timestamp) sort list of items of window buffer time in List (instance variable) and iterate and forward. Clear List (instance variable).

This logic is working fine for me.

Ozan answered 25/4, 2019 at 9:32 Comment(1)
It is wise to back that collection with StateStore so that the buffered events are not lost in case of app crashes. Here is one example where you can lose the buffer https://mcmap.net/q/1165359/-can-i-rely-on-a-in-memory-java-collection-in-kafka-stream-for-buffering-events-by-fine-tuning-punctuate-and-commit-interval. I have committed how to sort events using state store here -> github.com/vinodhinic/kstream-sortingTerrific

© 2022 - 2024 — McMap. All rights reserved.