Is it possible to get the latest value for a message key from kafka messages
Asked Answered
T

3

4

Suppose I have different values for a same message key.

For example:

{
userid: 1,
email: [email protected] }

{
userid: 1,
email: [email protected] }

{
userid: 1,
email: [email protected] }

In this above case I want only the latest value updated by the user, that is, '[email protected]'.

My kafka stream should give me only the third value and not the previous 2 values.

Teenager answered 1/4, 2020 at 6:37 Comment(0)
Y
17

Since you've not specified a particular client, I'll show you how this can be done with ksqlDB and the newly-added function, LATEST_BY_OFFSET.

First, I populate the topic with source data:

kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "[email protected]" }
1:{ "userid": 1, "email": "[email protected]" }
1:{ "userid": 1, "email": "[email protected]" }
EOF

Then in ksqlDB model this as a stream of events first:

ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> SET 'auto.offset.reset' = 'earliest';                                                                                                                                                                                                                                         [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY   |USERID   |EMAIL            |
+---------+---------+-----------------+
|1        |1        |[email protected]  |
|1        |1        |[email protected]  |
|1        |1        |[email protected]  |

Now we can tell ksqlDB to take this stream of events and give us just the latest value (based on the offset), either directly:

ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID              |KSQL_COL_1          |
+--------------------+--------------------+
|1                   |[email protected]     |

Press CTRL-C to interrupt

or more usefully, as materialised state within ksqlDB:

CREATE TABLE USER_LATEST_STATE AS 
    SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL 
      FROM USER_UPDATES 
     GROUP BY USERID 
     EMIT CHANGES;

This table is still driven by changes to the Kafka topic, but can be queried directly for the current state, either as of now ("pull query"):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL               |
+--------------------+
|[email protected]     |
Query terminated
ksql>

or as a stream of changes as the state evolves ("push query"):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL               |
+--------------------+
|[email protected]     |

[ query continues indefinitely ]

asciicast

Yankee answered 1/4, 2020 at 8:53 Comment(1)
When I create the table I see multiple records with the same key.. Is this intentional?Amoretto
A
1

It seems that you want to buffer the records before further processing. Since in streaming, you have ever-growing, infinite data sets so you never know if you gonna wait for more records or flush the buffer to further processing. Can you add more details about how you will process these records?

You can introduce an additional parameter which is the maximum time you want to wait before flushing the buffer. To archive this you can either use a Session window or a Tumbling window, or use records cache in associate with a commit interval, or you can also implement it with Kafka low lever processor API.

Here is the example code show how you can archive this using a Tumbling window to aggregate and suppress all userId info in 1 hour time window, accept events which are late in 10 minutes then send the suppressed events to downstream processor (if you use this you may not get the final results until new event coming in):

userInfoKStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
    .aggregate(() -> "", (userId, newValue, currentValue) -> newValue)
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .foreach((userId, value) -> {});
Aretha answered 1/4, 2020 at 8:21 Comment(0)
M
-1

You need Kafka log compaction. In short, if you want that your topic keeps just the last value for a specific key you should set property log.cleanup.policy=compact. You can find more about it here.

Minute answered 1/4, 2020 at 8:36 Comment(1)
Log compaction is a cleanup mechanism to delete old messages with the same key, but it does not help you to get the "currently newest message" for a specific key.Derosier

© 2022 - 2024 — McMap. All rights reserved.