Since you've not specified a particular client, I'll show you how this can be done with ksqlDB and the newly-added function, LATEST_BY_OFFSET
.
First, I populate the topic with source data:
kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "[email protected]" }
1:{ "userid": 1, "email": "[email protected]" }
1:{ "userid": 1, "email": "[email protected]" }
EOF
Then in ksqlDB model this as a stream of events first:
ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> SET 'auto.offset.reset' = 'earliest'; [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY |USERID |EMAIL |
+---------+---------+-----------------+
|1 |1 |[email protected] |
|1 |1 |[email protected] |
|1 |1 |[email protected] |
Now we can tell ksqlDB to take this stream of events and give us just the latest value (based on the offset), either directly:
ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID |KSQL_COL_1 |
+--------------------+--------------------+
|1 |[email protected] |
Press CTRL-C to interrupt
or more usefully, as materialised state within ksqlDB:
CREATE TABLE USER_LATEST_STATE AS
SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL
FROM USER_UPDATES
GROUP BY USERID
EMIT CHANGES;
This table is still driven by changes to the Kafka topic, but can be queried directly for the current state, either as of now ("pull query"):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL |
+--------------------+
|[email protected] |
Query terminated
ksql>
or as a stream of changes as the state evolves ("push query"):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL |
+--------------------+
|[email protected] |
[ query continues indefinitely ]