We can find the rolling/moving average of a time series data using window function
in pyspark.
The data I am dealing with doesn't have any timestamp
column but it does have a strictly increasing
column frame_number
. Data looks like this.
d = [
{'session_id': 1, 'frame_number': 1, 'rtd': 11.0, 'rtd2': 11.0,},
{'session_id': 1, 'frame_number': 2, 'rtd': 12.0, 'rtd2': 6.0},
{'session_id': 1, 'frame_number': 3, 'rtd': 4.0, 'rtd2': 233.0},
{'session_id': 1, 'frame_number': 4, 'rtd': 110.0, 'rtd2': 111.0,},
{'session_id': 1, 'frame_number': 5, 'rtd': 13.0, 'rtd2': 6.0},
{'session_id': 1, 'frame_number': 6, 'rtd': 43.0, 'rtd2': 233.0},
{'session_id': 1, 'frame_number': 7, 'rtd': 11.0, 'rtd2': 111.0,}]
df = spark.createDataFrame(d)
+------------+-----+-----+----------+
|frame_number| rtd| rtd2|session_id|
+------------+-----+-----+----------+
| 1| 11.0| 11.0| 1|
| 2| 12.0| 6.0| 1|
| 3| 4.0|233.0| 1|
| 4|110.0|111.0| 1|
| 5| 13.0| 6.0| 1|
| 6| 43.0|233.0| 1|
| 7| 11.0|111.0| 1|
+------------+-----+-----+----------+
I want to find the rolling average of the column rtd
on the strictly increasing column frame_number
.
I am trying something like this (using collect_list
).
window_size=2
w = Window.partitionBy("session_id").orderBy("frame_number").rowsBetween(0, window_size)
df_lists = df.withColumn('rtd_list', F.collect_list('rtd').over(w))
+------------+-----+-----+----------+-------------------+
|frame_number| rtd| rtd2|session_id| rtd_list|
+------------+-----+-----+----------+-------------------+
| 1| 11.0| 11.0| 1| [11.0, 12.0, 4.0]|
| 2| 12.0| 6.0| 1| [12.0, 4.0, 110.0]|
| 3| 4.0|233.0| 1| [4.0, 110.0, 13.0]|
| 4|110.0|111.0| 1|[110.0, 13.0, 43.0]|
| 5| 13.0| 6.0| 1| [13.0, 43.0, 11.0]|
| 6| 43.0|233.0| 1| [43.0, 11.0]|
| 7| 11.0|111.0| 1| [11.0]|
+------------+-----+-----+----------+-------------------+
And then applying a UDF
to get moving average.
windudf = F.udf( lambda v: str(np.nanmean(v)), StringType())
out = df_lists.withColumn("moving_average", windudf("rtd_list"))
+------------+-----+-----+----------+-------------------+------------------+
|frame_number| rtd| rtd2|session_id| rtd_list| moving_average|
+------------+-----+-----+----------+-------------------+------------------+
| 1| 11.0| 11.0| 1| [11.0, 12.0, 4.0]| 9.0|
| 2| 12.0| 6.0| 1| [12.0, 4.0, 110.0]| 42.0|
| 3| 4.0|233.0| 1| [4.0, 110.0, 13.0]|42.333333333333336|
| 4|110.0|111.0| 1|[110.0, 13.0, 43.0]|55.333333333333336|
| 5| 13.0| 6.0| 1| [13.0, 43.0, 11.0]|22.333333333333332|
| 6| 43.0|233.0| 1| [43.0, 11.0]| 27.0|
| 7| 11.0|111.0| 1| [11.0]| 11.0|
+------------+-----+-----+----------+-------------------+------------------+
Issue with above method is that it cannot define a slide duration
for a window. Above method calculates moving average for evrey frame. I wnt to move my window by some amount before finding the average. Any ways to achieve this?
d = [{'session_id': 1, 'frame_number': 1, 'rtd': 11.0, 'rtd2': 11.0,}, {'session_id': 1, 'frame_number': 2, 'rtd': 12.0, 'rtd2': 6.0}, {'session_id': 1, 'frame_number': 3, 'rtd': 4.0, 'rtd2': 233.0}, {'session_id': 1, 'frame_number': 4, 'rtd': 110.0, 'rtd2': 111.0,}, {'session_id': 1, 'frame_number': 5, 'rtd': 13.0, 'rtd2': 6.0}, {'session_id': 1, 'frame_number': 6, 'rtd': 43.0, 'rtd2': 233.0}, {'session_id': 1, 'frame_number': 7, 'rtd': 11.0, 'rtd2': 111.0,}] df = spark.createDataFrame(d)
– UlibarriI wnt to move my window by some amount before finding the average
? – Valiantwindow_size
is10
. Then average should be calculated at everyslide
of5
. i.e, If average is the mean of next 10 frames, then, calculateaverage
at frame 0, frame 5, frame 10..so on. – Ulibarri