Event de-duplication using Cassandra
Asked Answered
P

3

6

I'm looking for the best way to de-duplicate events using Cassandra.

I have many clients receiving event id's (thousands per second). I need to ensure that each event id is processed once and only once with high reliability and high availability.

So far I've tried two methods:

  1. Use the event id as a partition key, and do an "INSERT ... IF NOT EXISTS". If that fails, then the event is a duplicate and can be dropped. This is a nice clean approach, but the throughput is not great due to Paxos, especially with higher replication factors such as 3. It's also fragile, since IF NOT EXISTS always requires a quorum to work and there's no way to back down to a lower consistency if a quorum isn't available. So a couple of down nodes will completely block some event id's from being processed.

  2. Allow clients to collide on the same event id, but then detect the collision using a clustering column. So insert using the event id as a partition key, and a client generated timeuuid as a clustering column. The client will then wait a while (in case other clients are inserting the same partition key) and then do a read of the event id with limit 1, to return the oldest clustered row. If the timeuuid it reads back matches what it inserted, then it is the "winner" and processes the event. If the timeuuid does not match, then it is a duplicate and can be dropped.

The collision (baker's algorithm) approach has much better throughput and availability than using IF NOT EXISTS, but it's more complex and feels more risky. For example if the system clock on a client is out of whack, then a duplicate event would look like a non-duplicate. All my client and Cass nodes use NTP, but that's not always perfect at synchronizing clocks.

Anyone have a suggestion for which approach to use? Is there another way to do this?

Also note that my cluster will be set up with three data centers with about 100 ms latency between DC's.

Thanks.

Polymer answered 29/7, 2015 at 15:4 Comment(0)
G
3

IF NOT EXISTS does not scale as well as stock Cassandra (because coordination is slow, but you know that), but is probably the "official, right" way to do it. There are two other methods that "work":

1) Use an external locking system (zookeeper, memcached CAS, etc), that allows you to handle the coordination OUTSIDE of cassandra.

2) Use an ugly hack of an inverted timestamp trick so that first write wins. Rather than using a client supplied timestamp that corresponds to actual wall time, use MAX_LONG - (wall time) = timestamp. That way, the first write has the highest "timestamp", and will take precedence of subsequent writes. This method works, though it plays havok with things like DTCS (if you're doing time series and want to use DTCS, don't use this method, DTCS will be horribly confused) and deletion in general (if you ever want to ACTUALLY DELETE a row with a REAL tombstone, you'll have to write that tombstone with an artificial timestamp as well.

It's worth noting that there have been attempts to address the 'last-write-always-wins' nature of cassandra - for example, CASSANDRA-6412 (which I had working at one point, and will likely pick up again in the next month or so).

Gnathous answered 3/8, 2015 at 5:48 Comment(3)
Your second suggestion is intriguing. I'm planning to use TTL to clean up data after a week or so, and it seems that trick would break TTL. So I'm leaning towards using IF NOT EXISTS, but I'm a little worried about CASSANDRA-9328, although CASSANDRA-6246 might address that and even speed up IF NOT EXISTS when it gets into a release.Polymer
Fake timestamps do not interact with TTLs. Timestamps and TTLs are stored in separate fields in the cell. Timestamps can be of arbitrary resolution. TTLs are second resolution regardless of clock.Gnathous
However, timestamps are used to determine whether or not it's safe to purge entire sstables when TTLs do expire, so you will have fairly inefficient expiration of whole sstables at some point (potentially, if you're able to spin up a lab to test, it'd be worth testing).Gnathous
S
1

Might be diverting here but have you tried distributed redis locks http://redis.io/topics/distlock with sharding based on event_id using Twemproxy as a proxy for redis, if your loads are too high.

Stuffy answered 30/7, 2015 at 6:4 Comment(3)
Thanks for the suggestion. I haven't used redis before. It looks like an in memory KV store, so would be fast, but the data durability looks weaker than Cassandra, and it appears to use a master slave architecture. That would cause me problems since my nodes are periodically re-installed, which I imagine would be a headache when it happened to the master.Polymer
Can you please elaborate on the "nodes are periodically re-installed" aspect?Stuffy
We periodically make new software releases which get installed on all machines one by one (and the machine may be rebooted after installation). This is done in an automated way so at any time a machine may be killed.Polymer
I
1

I think that from all proposed solutions your second one is the best. But instead storing only the oldest value by clustered column I would store all events to keep it history ordered from oldest to newest ( when inserting you don't have to check if already exists and is oldest etc, then you can select the one with the oldest writetime attribute ). Then I would select the oldest for processing as you wrote. Since cassandra see no difference between insert or upsert I don't see any alternatives to do it with cassandra or as someone said - do this outside.

Isocracy answered 3/8, 2015 at 6:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.