How to do de-duplication on records from AWS Kinesis Firehose to Redshift?
Asked Answered
S

2

10

I read the document of official AWS Kinesis Firehose but it doesn't mention how to handle duplicated events. Does anyone have experience on it? I googled someone use ElasticCache to do filtering, does it mean I need to use AWS Lambda to encapsulate such filtering logic? Is there any simple way like firehose to ingest data into Redshift and at the same time has "exactly once" semantics? Thanks a lot!

Spiers answered 16/1, 2016 at 7:19 Comment(3)
Hi! can you link or reference such document? Thank you.Autecology
Here you are: blog.flux7.com/amazon-kinesis-a-use-caseSpiers
It's expected as Firehose guarantees at-least-once semantics. Here's the official documentation related to this behaviour.Federation
F
19

You can have duplication on both sides of the Kinesis Stream. You might put the same events twice into the Stream, and you might read the event twice by the consumers.

The producers side can happen if you try to put an event to the Kinesis stream, but for some reason you are not sure if it was written successfully or not, and you decide to put it again. The consumer side can happen if you are getting a batch of events and start processing them, and you crash before you managed to checkpoint your location, and the next worker is picking the same batch of events from the Kinesis stream, based on the last checkpoint sequence-ID.

Before you start solving this problem, you should evaluate how often do you have such duplication and what is the business impact of such duplications. Not every system is handling financial transactions that can't tolerate duplication. Nevertheless, if you decide that you need to have such de-duplication, a common way to solve it is to use some event-ID and track if you processed that event-ID already.

ElasticCache with Redis is a good place to track your event-ID. Every time you pick up an event for processing, you check if you already have it in the hash table in Redis, if you find it, you skip it, and if you don't find it, you add it to the table (with some TTL based on the possible time window for such duplication).

If you choose to use Kinesis Firehose (instead of Kinesis Streams), you no longer have control on the consumer application and you can't implement this process. Therefore, you either want to run such de-duplication logic on the producer side, switch to use Kinesis Streams and run your own code in Lambda or KCL, or settle for the de-duplication functions in Redshift (see below).

If you are not too sensitive to duplication, you can use some functions in Redshift, such as COUNT DISTINCT or LAST_VALUE in a WINDOW function.

Felting answered 16/1, 2016 at 22:39 Comment(6)
Thanks for your answer. The scenario of my use case is financial calculation, so it can't tolerate duplication. What I am concern now is what type of ElasticCache should I use in such de-duplication use case? Memcache or Redis.Spiers
Probably Redis, better TPS and the size of memory for the keys is very small and can fit to a single instance.Felting
Using a Redis cache will not guarantee removal of duplicates. If you have multiple kinesis shards, random PartitionKeys, and are using lambda, then the lambda instances are unaware of each other. This means possible race conditions where one lambda could be writing to Redis while another is reading from Redis. You would therefore need to implement a lock that each process vies for (letting only one process at a time access Redis). This would be bad for performance. Consider PartitionKeys that are a hash of the message, so the parallel processing of the same message is addressed.Surpass
Redis is single threaded and your mostly don’t need to use locks like in a multi threaded environment.Felting
So to summarize, if duplicates are a big issue, it's best not to go through Firehose and instead write to Redshift using for example a Lambda function (staging table, COPY command etc). Is that correct?Hydric
It depends what you mean by duplicates are a big issue. If you have many duplicates you will have a lot of calls to try to dedup them, but if having a duplicate is a big problem on your reporting output in Redshift, you want to dedup them as reliable as possible. I’m not a great supporter of staging tables in Redshift. Analytical data should be immutable as much as possible, and you can use COUNT DISTINCT or other tricks to handle duplicates.Felting
F
0

Not sure if this could be a solution. But to handle duplicates, you need to write your own KCL. Firehose cannot gurantee no duplication. You can get rid of Firehose once you have your own KCL consumers that processes your data from the Kinesis Date Stream. If you do so you can follow the linked article, (full disclosure, auther here), which stores events into S3 after deduplicating and processing it through a KCL consumer.

Store events by grouping them based on the minute they were received by the Kinesis data stream by looking at their ApproximateArrivalTimestamp. This allows us to always save our events on the same key prefix, given a batch of records no matter when they are processed. For e.g. all events received by Kinesis at 2020/02/02/ 15:55 Hrs will be stored at /2020/02/02/15/55/*. Therefore, if the key is already present in the given minute, it means that the batch has already been processed and stored to S3.

You can implement your own ISequenceStore which will be implemented against Redshift in your case (In the article, it is done against S3). Read the full article below.

https://www.nabin.dev/avoiding-duplicate-records-with-aws-kcl-and-s3

Fernery answered 4/2, 2020 at 10:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.