How to expire state of dropDuplicates in structured streaming to avoid OOM?
O

4

6

I want to count the unique access for each day using spark structured streaming, so I use the following code

.dropDuplicates("uuid")

and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:

.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")

but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.

So is there a perfect solution?

Obed answered 3/8, 2017 at 3:27 Comment(0)
O
11

After a few days effort I finally find out the way myself.

While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:

.select(
    window($"timestamp", "1 day"),
    $"timestamp",
    $"uuid"
  )
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")

Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.

Obed answered 7/8, 2017 at 9:31 Comment(0)
C
0

Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in buckets. Assumption is that event time is provided in milliseconds.

// removes all duplicates that are in 15 minutes tumbling window.
// doesn't remove duplicates that are in different 15 minutes windows !!!!
public static Dataset<Row> removeDuplicates(Dataset<Row> df) {
    // converts time in 15 minute buckets
    // timestamp - (timestamp % (15 * 60))
    Column bucketCol = functions.to_timestamp(
            col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
    df = df.withColumn("bucket", bucketCol);

    String windowDuration = "15 minutes";
    df = df.withWatermark("bucket", windowDuration)
            .dropDuplicates("uuid", "bucket");

    return df.drop("bucket");
}
Chris answered 14/11, 2018 at 21:7 Comment(0)
C
0

I found out that window function didn't work so I chose to use window.start or window.end.

.select(
   window($"timestamp", "1 day").start,
   $"timestamp",
   $"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
Copyist answered 18/11, 2018 at 2:58 Comment(0)
A
0

This question is very old ! However for someone facing similar issue now , spark latest version has provided dropDuplicatesWithinWatermark option for this scenario :

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
streamingDf \
  .withWatermark("eventTime", "10 hours") \
  .dropDuplicatesWithinWatermark("guid")

Where watermark column need not be specified in dropDuplicates. Reference : https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication

Arbalest answered 30/11, 2023 at 5:44 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.