Kafka GlobalKTable Latency Issue
Asked Answered
W

2

6

I have a topic which is read as GlobalKTable and Materialized in a store. The issue is if I update a key on the topic and then read from store, for a while(~0.5sec) I get the old value.

What could be the reason for this issue?

Is it that globalktable stores the data in rocksDB per application instance so if the key on another partition is updated it takes some time to pull data from all partitions and update its local rocksDB. If not, please explain how does globalktable store maintain its state internally?

How can the above issue be resolved ? Should we not use globalktable in such scenarios where consistency is expected to match that of say a mysql database?

Winton answered 9/1, 2019 at 6:49 Comment(0)
S
7

Is it that globalktable stores the data in rocksDB per application instance so if the key on another partition is updated it takes some time to pull data from all partitions and update its local rocksDB. If not, please explain how does globalktable store maintain its state internally?

Absolutely yes. There is always some latency until Kafka Streams poll() the topic again and updates is local RocksDB.

Should we not use globalktable in such scenarios where consistency is expected to match that of say a mysql database?

It depends on what guarantees you need -- if the producer writes into the GlobalKTable topic and the write was successful, this does not guarantee that a Kafka Streams application has consumed this write and has updated the GlobalKTable. Producers and Consumers are decoupled in Kafka by design.

Saveall answered 9/1, 2019 at 12:56 Comment(5)
Are there any options? The last paragraph is exactly the situation I'm in.Raul
There no not much you can do atm. You can try to decrease latency on write and read by setting corresponding consumer/producer configs -- but it might of course reduce the throughput. It's a classic "latency vs throughput" and async processing problem.Saveall
Thanks a lot! I kind of feared this as an answer. Would you say it is okay to set latencies down to 1ms or what is the most aggressive value that but destroys throughout completely?Raul
Could I take advantage of exactly_once semantics and check the offset before accessing the store in any way? Would a switch to the Processor API be advisable?Raul
I don't think that exactly once would help. It's a fundamental problem of async processing. Note: for at-least-once and exactly-once case, offsets are only committed after a message is fully processed. Hence, checking committed offsets could help -- however, if you have repartition topics it's hard to impossible to track the offset relationship between those topic. Also, if offset X is committed and you query the store you may see an update from offset X+1 already.Saveall
U
0

It seems possible to create a KTable that functions similarly to a GlobalKTable. From a KStream instead of creating a GlobalKTable I used a custom StreamPartitioner and implemented partitions() to return a set of all partition numbers, effectively broadcasting the all the rows to every partition, and stored that as a KTable with a string key. Obviously this requires a small data set that all fits on each node as does GlobalKTable. Then I joined (regular join) another KTable to it using a key extractor function. In this case I wanted to lookup a constant key out of the simulated GlobalKTable, but I wanted to control the latency, which is easier with a KTable than a GlobalKTable.

Underling answered 15/2 at 20:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.