pyspark: rolling average using timeseries data
Asked Answered
V

4

52

I have a dataset consisting of a timestamp column and a dollars column. I would like to find the average number of dollars per week ending at the timestamp of each row. I was initially looking at the pyspark.sql.functions.window function, but that bins the data by week.

Here's an example:

%pyspark
import datetime
from pyspark.sql import functions as F

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()

This results in two records:

|        start        |          end         | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|

The window function binned the time series data rather than performing a rolling average.

Is there a way to perform a rolling average where I'll get back a weekly average for each row with a time period ending at the timestampGMT of the row?

EDIT:

Zhang's answer below is close to what I want, but not exactly what I'd like to see.

Here's a better example to show what I'm trying to get at:

%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))

This results in the following dataframe:

dollars timestampGMT            rolling_average
25      2017-03-18 11:27:18.0   25
17      2017-03-10 15:27:18.0   15
13      2017-03-15 12:27:18.0   15

I'd like the average to be over the week proceeding the date in the timestampGMT column, which would result in this:

dollars timestampGMT            rolling_average
17      2017-03-10 15:27:18.0   17
13      2017-03-15 12:27:18.0   15
25      2017-03-18 11:27:18.0   19

In the above results, the rolling_average for 2017-03-10 is 17, since there are no preceding records. The rolling_average for 2017-03-15 is 15 because it is averaging the 13 from 2017-03-15 and the 17 from 2017-03-10 which falls withing the preceding 7 day window. The rolling average for 2017-03-18 is 19 because it is averaging the 25 from 2017-03-18 and the 13 from 2017-03-10 which falls withing the preceding 7 day window, and it is not including the 17 from 2017-03-10 because that does not fall withing the preceding 7 day window.

Is there a way to do this rather than the binning window where the weekly windows don't overlap?

Venipuncture answered 21/8, 2017 at 22:6 Comment(0)
V
51

I figured out the correct way to calculate a moving/rolling average using this stackoverflow:

Spark Window Functions - rangeBetween dates

The basic idea is to convert your timestamp column to seconds, and then you can use the rangeBetween function in the pyspark.sql.Window class to include the correct rows in your window.

Here's the solved example:

%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window


#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('rolling_average', F.avg("dollars").over(w))

This results in the exact column of rolling averages that I was looking for:

dollars   timestampGMT            rolling_average
17        2017-03-10 15:27:18.0   17.0
13        2017-03-15 12:27:18.0   15.0
25        2017-03-18 11:27:18.0   19.0
Venipuncture answered 22/8, 2017 at 18:10 Comment(3)
If you have a completed continous date column, then you can use rowsBetween(-7,0)Hoang
This uses the window function which forces the data frame into a single node. If it is a very large data frame, you run into memory issues. Is there a way to use the rangeBetween but leveraging the distributed computing of a spark dataframe?Workmanship
From the docs below rangeBetween(start, end) Creates a WindowSpec with the frame boundaries defined, from start (inclusive) to end (inclusive). So in the above code rangeBetween(-days(7), 0)) should be rangeBetween(-days(7)+1, 0)) linkLever
A
12

I will add a variation which I personally found very useful. I hope someone will find it useful as well:

If you want to groupby then within the respective groups calculate the moving average:

Example of the dataframe :

from pyspark.sql.window import Window
from pyspark.sql import functions as func


df = spark.createDataFrame([("tshilidzi", 17.00, "2018-03-10T15:27:18+00:00"), 
  ("tshilidzi", 13.00, "2018-03-11T12:27:18+00:00"),   
  ("tshilidzi", 25.00, "2018-03-12T11:27:18+00:00"), 
  ("thabo", 20.00, "2018-03-13T15:27:18+00:00"), 
  ("thabo", 56.00, "2018-03-14T12:27:18+00:00"), 
  ("thabo", 99.00, "2018-03-15T11:27:18+00:00"), 
  ("tshilidzi", 156.00, "2019-03-22T11:27:18+00:00"), 
  ("thabo", 122.00, "2018-03-31T11:27:18+00:00"), 
  ("tshilidzi", 7000.00, "2019-04-15T11:27:18+00:00"),
  ("ash", 9999.00, "2018-04-16T11:27:18+00:00") 
  ],
  ["name", "dollars", "timestampGMT"])

# we need this timestampGMT as seconds for our Window time frame
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

df.show(10000, False)

Output:

+---------+-------+---------------------+
|name     |dollars|timestampGMT         |
+---------+-------+---------------------+
|tshilidzi|17.0   |2018-03-10 17:27:18.0|
|tshilidzi|13.0   |2018-03-11 14:27:18.0|
|tshilidzi|25.0   |2018-03-12 13:27:18.0|
|thabo    |20.0   |2018-03-13 17:27:18.0|
|thabo    |56.0   |2018-03-14 14:27:18.0|
|thabo    |99.0   |2018-03-15 13:27:18.0|
|tshilidzi|156.0  |2019-03-22 13:27:18.0|
|thabo    |122.0  |2018-03-31 13:27:18.0|
|tshilidzi|7000.0 |2019-04-15 13:27:18.0|
|ash      |9999.0 |2018-04-16 13:27:18.0|
+---------+-------+---------------------+

To calculate the moving average based on the name and still maintain all rows:

#create window by casting timestamp to long (number of seconds)
w = (Window()
     .partitionBy(col("name"))
     .orderBy(F.col("timestampGMT").cast('long'))
     .rangeBetween(-days(7), 0))

df2 = df.withColumn('rolling_average', F.avg("dollars").over(w))

df2.show(100, False)

Output:

+---------+-------+---------------------+------------------+
|name     |dollars|timestampGMT         |rolling_average   |
+---------+-------+---------------------+------------------+
|ash      |9999.0 |2018-04-16 13:27:18.0|9999.0            |
|tshilidzi|17.0   |2018-03-10 17:27:18.0|17.0              |
|tshilidzi|13.0   |2018-03-11 14:27:18.0|15.0              |
|tshilidzi|25.0   |2018-03-12 13:27:18.0|18.333333333333332|
|tshilidzi|156.0  |2019-03-22 13:27:18.0|156.0             |
|tshilidzi|7000.0 |2019-04-15 13:27:18.0|7000.0            |
|thabo    |20.0   |2018-03-13 17:27:18.0|20.0              |
|thabo    |56.0   |2018-03-14 14:27:18.0|38.0              |
|thabo    |99.0   |2018-03-15 13:27:18.0|58.333333333333336|
|thabo    |122.0  |2018-03-31 13:27:18.0|122.0             |
+---------+-------+---------------------+------------------+
Arluene answered 28/3, 2019 at 11:37 Comment(0)
M
6

It's worth noting, that if you don't care about the exact dates - but care to have the average of the last 30 days available you can use the rowsBetween function as follows:

w = Window.orderBy('timestampGMT').rowsBetween(-7, 0)

df = eurPrices.withColumn('rolling_average', F.avg('dollars').over(w))

Since you order by the dates, it will take the last 7 occurrences. You save all the casting.

Meghan answered 12/3, 2019 at 9:5 Comment(0)
S
2

Do you mean this :

df = spark.createDataFrame([(17, "2017-03-11T15:27:18+00:00"),
                            (13, "2017-03-11T12:27:18+00:00"),
                            (21, "2017-03-17T11:27:18+00:00")],
                           ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', f.avg("dollars").over(Window.partitionBy(f.window("timestampGMT", "7 days"))))

Output:

+-------+-------------------+---------------+                                   
|dollars|timestampGMT       |rolling_average|
+-------+-------------------+---------------+
|21     |2017-03-17 19:27:18|21.0           |
|17     |2017-03-11 23:27:18|15.0           |
|13     |2017-03-11 20:27:18|15.0           |
+-------+-------------------+---------------+
Supportive answered 22/8, 2017 at 1:34 Comment(1)
Thanks Zhang, that is closer to what I want, but not exactly what I'd like. Your code is still calculating the answers via date binning. I'd like each weekly average to end at the date in the row. It's my fault for not making a great example. I'm going to edit my post with an updated example showing what I'd like.Venipuncture

© 2022 - 2024 — McMap. All rights reserved.