Effective strategy to avoid duplicate messages in apache kafka consumer
Asked Answered
L

7

83

I have been studying apache kafka for a month now. I am however, stuck at a point now. My use case is, I have two or more consumer processes running on different machines. I ran a few tests in which I published 10,000 messages in kafka server. Then while processing these messages I killed one of the consumer processes and restarted it. Consumers were writing processed messages in a file. So after consumption finished, file was showing more than 10k messages. So some messages were duplicated.

In consumer process I have disabled auto commit. Consumers manually commit offsets batch wise. So for e.g if 100 messages are written to file, consumer commits offsets. When single consumer process is running and it crashes and recovers duplication is avoided in this manner. But when more than one consumers are running and one of them crashes and recovers, it writes duplicate messages to file.

Is there any effective strategy to avoid these duplicate messages?

Levitate answered 15/4, 2015 at 10:20 Comment(3)
I fail to see how the duplication issue is avoided in the single consumer case. Can you help me understand please?Miosis
confluent.io/blog/…Unseal
confluent.io/blog/…Unseal
C
55

The short answer is, no.

What you're looking for is exactly-once processing. While it may often seem feasible, it should never be relied upon because there are always caveats.

Even in order to attempt to prevent duplicates you would need to use the simple consumer. How this approach works is for each consumer, when a message is consumed from some partition, write the partition and offset of the consumed message to disk. When the consumer restarts after a failure, read the last consumed offset for each partition from disk.

But even with this pattern the consumer can't guarantee it won't reprocess a message after a failure. What if the consumer consumes a message and then fails before the offset is flushed to disk? If you write to disk before you process the message, what if you write the offset and then fail before actually processing the message? This same problem would exist even if you were to commit offsets to ZooKeeper after every message.

There are some cases, though, where exactly-once processing is more attainable, but only for certain use cases. This simply requires that your offset be stored in the same location as unit application's output. For instance, if you write a consumer that counts messages, by storing the last counted offset with each count you can guarantee that the offset is stored at the same time as the consumer's state. Of course, in order to guarantee exactly-once processing this would require that you consume exactly one message and update the state exactly once for each message, and that's completely impractical for most Kafka consumer applications. By its nature Kafka consumes messages in batches for performance reasons.

Usually your time will be more well spent and your application will be much more reliable if you simply design it to be idempotent.

Costello answered 15/4, 2015 at 10:20 Comment(2)
What is the real benefit we are getting with this "exactly-once scenario" as compared to enabling automated commit? Under what scenarios and case, this will help. As in my case I will have multiple consumers running on a different machine consuming data from the same topic which has multiple partitions and I want to eliminate the possibility of missing messages and also to reduce the number of messages duplicated during rebalancing.Sector
Receiving duplicate message is ok in my case as my system can handle it but I cannot lose data at all so wanted to see if this approach will give any benefit or not by managing offsets manually either on disk or on some database.Sector
M
43

This is what Kafka FAQ has to say on the subject of exactly-once:

How do I get exactly-once messaging from Kafka?

Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data production:

  • Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded
  • Include a primary key (UUID or something) in the message and deduplicate on the consumer.

If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example, if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:

  • Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.
  • The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon
Miosis answered 23/4, 2015 at 23:57 Comment(1)
Receiving duplicate message is ok in my case as my system can handle it but I cannot lose data at all so wanted to see if this approach will give any benefit or not by managing offsets manually either on disk or on some database.Sector
G
29

I agree with RaGe's deduplicate on the consumer side. And we use Redis to deduplicate Kafka message.

Assume the Message class has a member called 'uniqId', which is filled by the producer side and is guaranteed to be unique. We use a 12 length random string. (regexp is '^[A-Za-z0-9]{12}$')

The consumer side use Redis's SETNX to deduplicate and EXPIRE to purge expired keys automatically. Sample code:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
    log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
    jedis.expire(key, 7200); // 2 hours is ok for production environment;
}

The above code did detect duplicate messages several times when Kafka(version 0.8.x) had situations. With our input/output balance audit log, no message lost or dup happened.

Grandniece answered 12/5, 2016 at 10:23 Comment(4)
this wont work in case of re-try's you'll treat them as dedup's while they should be retryFreedwoman
What if right after jedis.setnx() command, the consumer crashes / network failed etc before it finishes its processing task? i guess we should take this small risk?Yaroslavl
user1955934 You can use Redis transaction.Percussionist
What happens when redis goes down? The whole processing stops?Verbenia
P
11

There's a relatively new 'Transactional API' now in Kafka that can allow you to achieve exactly once processing when processing a stream. With the transactional API, idempotency can be built in, as long as the remainder of your system is designed for idempotency. See https://www.baeldung.com/kafka-exactly-once

Papyraceous answered 22/12, 2019 at 20:14 Comment(1)
This applies only if the producer is using transaction API otherwise consumer could not benefit from this pattern.Maltose
F
5

Whatever done on producer side, still the best way we believe to deliver exactly once from kafka is to handle it on consumer side:

  1. Produce msg with a uuid as the Kafka message Key into topic T1
  2. consumer side read the msg from T1, write it on hbase with uuid as rowkey
  3. read back from hbase with the same rowkey and write to another topic T2
  4. have your end consumers actually consume from topic T2
Fidelfidela answered 25/4, 2019 at 21:6 Comment(0)
T
1

As mentioned above, good solution is add id to every consumed message add store indexes of last n messages on consumer database. This solution is described in the book Microservices Patterns by Chris Richardson in Chapter 3.3.6

https://www.amazon.com/Microservices-Patterns-examples-Chris-Richardson/dp/1617294543

Trawler answered 27/12, 2023 at 6:12 Comment(0)
I
0

Other alternative solutions would be:

  1. Produce a message with a unique key into the topic test
  2. On the consumer side, push an entry with the unique key into DB once processing is done successfully. Before starting the process at consumer id, check the unique key entry in the DB. If exists then the consumer already processed that message else do com
Impish answered 30/11, 2023 at 4:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.