Kafka Log Compacted Topic Duplication Values against same key not deleted
Asked Answered
M

3

5

Log compacted topics are not supposed to keep duplicates against the same key. But in our case, when a new value with the same key is sent, the previous one isn't deleted. What could be the issue?

val TestCompactState: KTable[String, TestCompact] = builder.table[String, TestCompact](kafkaStreamConfigs.getString("testcompact-source"),
   (TestCompactmaterialized).withKeySerde(stringSerde).withValueSerde(TestCompactSerde)) 

what i get Actual Result

Offsets      Keys        Messages
5            {"id":5}   {"id":5,"namee":"omer","__deleted":"false"}
6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}

I just want latest one record against same key Expected Result

6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}
Mihe answered 10/4, 2020 at 13:2 Comment(0)
K
6

There could be several reasons for this behavior. The compaction cleanup policy does not run after every single incoming message. Instead, there is the broker configuration

log.cleaner.min.compaction.lag.ms: The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.

Type: long; Default: 0; Valid Values: ; Update Mode: cluster-wide

This defaults to 0 so that might not be the reason but worth checking.

It is important to note that the compact policy never compacts the current segment. Messages are eligble for compaction only on inactive segments. Make sure to validate

log.segment.bytes: The maximum size of a single log file

Type: int; Default: 1073741824; Valid Values: [14,...]; Update Mode: cluster-wide

The compaction will usually be triggered by the data that is in the current ("dirty") segement of the log. The term "dirty" comes from uncleaned/uncompacted. There is another configuration that helps steer the compaction.

log.cleaner.min.cleanable.ratio: The minimum ratio of dirty log to total log for a log to eligible for cleaning. If the log.cleaner.max.compaction.lag.ms or the log.cleaner.min.compaction.lag.ms configurations are also specified, then the log compactor considers the log eligible for compaction as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) records for at least the log.cleaner.min.compaction.lag.ms duration, or (ii) if the log has had dirty (uncompacted) records for at most the log.cleaner.max.compaction.lag.ms period.

Type: double; Default: 0.5; Valid Values: ;Update Mode: cluster-wide

Per default, the deletion lag for a message to be compacted is quite high as the following configuration description shows.

log.cleaner.max.compaction.lag.ms: The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.

Type: long; Default: 9223372036854775807; Valid Values: ; Update Mode: cluster-wide

To summarize, there could be several reason why you are observing what you have described. And it is very important to be aware that a compacted topic does not provide any guarantees to have duplicate message for the same key. It can only guarantee that "at least" the latest message for the same key is kept.

There is a nice blog that explains the log compaction in more detail.

Knickknack answered 10/4, 2020 at 14:47 Comment(0)
D
5

As far as I know it is not possible to apply a log compaction policy in order to keep exactly one message per key. Even if you set cleanup.policy=compact (topic-level) or log.cleanup.policy=compact (global level), there is no guarantee that only the latest message will be kept and older ones will be compacted.

According to the official Kafka documentation:

Log compaction gives us a more granular retention mechanism so that we are guaranteed to retain at least the last update for each primary key

Detestation answered 10/4, 2020 at 13:41 Comment(0)
N
1

The active segment of a partition is never compacted, so it might take some time and more messages being sent to the topic before it starts removing older duplicates.

Narrowminded answered 25/5, 2021 at 21:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.