Consume latest value from a topic for each keys
Asked Answered
B

1

9

I have a Kafka producer which is producing messages at high rate (message key is let us say a username and value is his current score in a game). The Kafka consumer is relatively slow in processing the consumed messages. Here my requirement is to show most up-to-date score and avoid showing stale data, with the tradeoff that some scores may never be shown.

Essentially for each of the username, I may have hundreds of messages in the same partition, but I always want to read the latest one.

A crude solution which has been implemented was like this: The producer sends just a key as each message and actual value is written to a database, which is shared with the consumer. The consumer reads each key from the queue and value from the database. Here the goal to read always the latest value is achieved by producer overwriting the value in the database -- so consumer which is in fact reading a given key will actually consume the latest value. But this solution has some drawbacks due to high number of reads and updates (slow, race conditions etc.)

I am looking for a more natural way of solving this in kafka or kafka streams where I can somehow define get latest value for a key from the stream of data for each key. Thanks!

Bjorn answered 27/4, 2018 at 14:56 Comment(2)
Compacted Kafka topics always preserve the latest key... Using a KTable in Kafka Streams can also perform roll-up operations over time windows. E.g select the key with the highest timestamps in a window and persist them to a global KTable of latest keysSearle
we've never used compacted topics, so first need to know about it a bit. Using Kafka Streams more promising in this caseBjorn
B
1

Below code helped

KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> dataTable = builder.table("input-topic");
dataTable.toStream().foreach((key, message) -> client.post(message));
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

What makes this possible in practice is in-memory compaction of incoming stream (details explained here). We could control the pressure using the parameters cache.max.bytes.buffering and commit.interval.ms

Bjorn answered 17/5, 2018 at 14:7 Comment(1)
Can you elaborate further ? How is this solving the stated problem ?Centre

© 2022 - 2024 — McMap. All rights reserved.