Rolling average without timestamp in pyspark
Asked Answered
U

1

1

We can find the rolling/moving average of a time series data using window function in pyspark.

The data I am dealing with doesn't have any timestamp column but it does have a strictly increasing column frame_number. Data looks like this.

d = [
    {'session_id': 1, 'frame_number': 1, 'rtd': 11.0, 'rtd2': 11.0,}, 
    {'session_id': 1, 'frame_number': 2, 'rtd': 12.0, 'rtd2': 6.0}, 
    {'session_id': 1, 'frame_number': 3, 'rtd': 4.0, 'rtd2': 233.0}, 
    {'session_id': 1, 'frame_number': 4, 'rtd': 110.0, 'rtd2': 111.0,}, 
    {'session_id': 1, 'frame_number': 5, 'rtd': 13.0, 'rtd2': 6.0}, 
    {'session_id': 1, 'frame_number': 6, 'rtd': 43.0, 'rtd2': 233.0}, 
    {'session_id': 1, 'frame_number': 7, 'rtd': 11.0, 'rtd2': 111.0,}]

df = spark.createDataFrame(d)

+------------+-----+-----+----------+
|frame_number|  rtd| rtd2|session_id|
+------------+-----+-----+----------+
|           1| 11.0| 11.0|         1| 
|           2| 12.0|  6.0|         1|
|           3|  4.0|233.0|         1|
|           4|110.0|111.0|         1|
|           5| 13.0|  6.0|         1|
|           6| 43.0|233.0|         1|
|           7| 11.0|111.0|         1|
+------------+-----+-----+----------+

I want to find the rolling average of the column rtd on the strictly increasing column frame_number.

I am trying something like this (using collect_list).

window_size=2
w = Window.partitionBy("session_id").orderBy("frame_number").rowsBetween(0, window_size)
df_lists = df.withColumn('rtd_list', F.collect_list('rtd').over(w))

+------------+-----+-----+----------+-------------------+
|frame_number|  rtd| rtd2|session_id|           rtd_list|
+------------+-----+-----+----------+-------------------+
|           1| 11.0| 11.0|         1|  [11.0, 12.0, 4.0]|
|           2| 12.0|  6.0|         1| [12.0, 4.0, 110.0]|
|           3|  4.0|233.0|         1| [4.0, 110.0, 13.0]|
|           4|110.0|111.0|         1|[110.0, 13.0, 43.0]|
|           5| 13.0|  6.0|         1| [13.0, 43.0, 11.0]|
|           6| 43.0|233.0|         1|       [43.0, 11.0]|
|           7| 11.0|111.0|         1|             [11.0]|
+------------+-----+-----+----------+-------------------+

And then applying a UDF to get moving average.

windudf = F.udf( lambda v: str(np.nanmean(v)), StringType())
out = df_lists.withColumn("moving_average", windudf("rtd_list"))
+------------+-----+-----+----------+-------------------+------------------+
|frame_number|  rtd| rtd2|session_id|           rtd_list|    moving_average|
+------------+-----+-----+----------+-------------------+------------------+
|           1| 11.0| 11.0|         1|  [11.0, 12.0, 4.0]|               9.0|
|           2| 12.0|  6.0|         1| [12.0, 4.0, 110.0]|              42.0|
|           3|  4.0|233.0|         1| [4.0, 110.0, 13.0]|42.333333333333336|
|           4|110.0|111.0|         1|[110.0, 13.0, 43.0]|55.333333333333336|
|           5| 13.0|  6.0|         1| [13.0, 43.0, 11.0]|22.333333333333332|
|           6| 43.0|233.0|         1|       [43.0, 11.0]|              27.0|
|           7| 11.0|111.0|         1|             [11.0]|              11.0|
+------------+-----+-----+----------+-------------------+------------------+    

Issue with above method is that it cannot define a slide duration for a window. Above method calculates moving average for evrey frame. I wnt to move my window by some amount before finding the average. Any ways to achieve this?

Ulibarri answered 13/4, 2018 at 1:13 Comment(3)
Generate data using this d = [{'session_id': 1, 'frame_number': 1, 'rtd': 11.0, 'rtd2': 11.0,}, {'session_id': 1, 'frame_number': 2, 'rtd': 12.0, 'rtd2': 6.0}, {'session_id': 1, 'frame_number': 3, 'rtd': 4.0, 'rtd2': 233.0}, {'session_id': 1, 'frame_number': 4, 'rtd': 110.0, 'rtd2': 111.0,}, {'session_id': 1, 'frame_number': 5, 'rtd': 13.0, 'rtd2': 6.0}, {'session_id': 1, 'frame_number': 6, 'rtd': 43.0, 'rtd2': 233.0}, {'session_id': 1, 'frame_number': 7, 'rtd': 11.0, 'rtd2': 111.0,}] df = spark.createDataFrame(d)Ulibarri
can you clarify this phrase I wnt to move my window by some amount before finding the average?Valiant
Let say if window_size is 10. Then average should be calculated at every slide of 5. i.e, If average is the mean of next 10 frames, then, calculate average at frame 0, frame 5, frame 10..so on.Ulibarri
S
4

Define window:

from pyspark.sql import functions as F

w = F.window(
    F.col("frame_number").cast("timestamp"),
    # Just example
    windowDuration="10 seconds",
    slideDuration="5 seconds",
).alias("window")

(df
    .groupBy(w, F.col("session_id"))
    .avg("rtd", "rtd2")
    .withColumn("window", F.col("window").cast("struct<start:long,end:long>"))
    .orderBy("window.start")
    .show())

# +------+----------+------------------+------------------+       
# |window|session_id|          avg(rtd)|         avg(rtd2)|
# +------+----------+------------------+------------------+
# |[-5,5]|         1|             34.25|             90.25|
# |[0,10]|         1|29.142857142857142|101.57142857142857|
# |[5,15]|         1|22.333333333333332|116.66666666666667|
# +------+----------+------------------+------------------+

Also please don't use collect_list with udf to compute average. It give no benefits and has severe performance implications.

Sergio answered 13/4, 2018 at 10:32 Comment(1)
Wow. That was so simple. This solution rocks.Ulibarri

© 2022 - 2024 — McMap. All rights reserved.