This is an old post, though I recently had to solve this with Spark 3.2. Here's the solution I came up with to both up-sample and down-sample the time-series to obtain exactly one data-point per object and per time period.
Assuming the following input data that we want to re-sample per day. Some variable
have several data points per day, some have no data during several days:
from pyspark.sql.types import StructType, StringType, ArrayType, DoubleType, TimestampType
from pyspark.sql.functions import udf, date_trunc, row_number, desc, coalesce, datediff, lead, explode, col, lit
from pyspark.sql import Window, Row
from datetime import datetime, timedelta
df = spark.createDataFrame([
Row(variable="A", record_ts=datetime.fromisoformat("2021-10-01T03:34:23.000"), value=1.),
Row(variable="B", record_ts=datetime.fromisoformat("2021-10-01T04:34:23.000"), value=10.),
Row(variable="C", record_ts=datetime.fromisoformat("2021-10-01T05:34:23.000"), value=100.),
Row(variable="A", record_ts=datetime.fromisoformat("2021-10-02T01:34:23.000"), value=2.),
Row(variable="A", record_ts=datetime.fromisoformat("2021-10-02T05:34:23.000"), value=3.),
Row(variable="C", record_ts=datetime.fromisoformat("2021-10-02T02:34:23.000"), value=200.),
Row(variable="C", record_ts=datetime.fromisoformat("2021-10-02T05:34:23.000"), value=200.),
Row(variable="B", record_ts=datetime.fromisoformat("2021-10-04T10:34:23.000"), value=40.),
Row(variable="B", record_ts=datetime.fromisoformat("2021-10-04T12:34:23.000"), value=42.),
Row(variable="B", record_ts=datetime.fromisoformat("2021-10-04T14:34:23.000"), value=46.),
Row(variable="A", record_ts=datetime.fromisoformat("2021-10-05T14:34:23.000"), value=6.),
Row(variable="A", record_ts=datetime.fromisoformat("2021-10-07T09:34:23.000"), value=7.),
Row(variable="B", record_ts=datetime.fromisoformat("2021-10-07T08:34:23.000"), value=70.),
Row(variable="C", record_ts=datetime.fromisoformat("2021-10-07T05:34:23.000"), value=700.),
])
I first need this simple udf which essentially just builds a sequence of timestamps:
@udf(ArrayType(TimestampType()))
def pad_time(count: int, start_time: datetime):
if repeated_count is None:
return []
else:
return [start_time + timedelta(days=c) for c in range(count)]
Down-sampling can be done with a simple groupBy
or partitionBy
, keeping max 1 value per variable each day (I chose partitionBy
in the example below).
Up-sampling with a "fill-forward" strategy can be done by measuring the size of a time gap between 2 successive rows, and then using this information to call the udf above.
df
# down-sampling by keeping the last value of each variable each day.
.withColumn("record_day", date_trunc("DAY", "record_ts"))
.withColumn("row_num",
row_number().over(
Window.partitionBy("variable", "record_day").orderBy(desc("record_ts"))
))
.where("row_num == 1")
# up-sampling part 1: counts the number of days to be filled (or 1 for the very last value)
.withColumn("gap",
coalesce(
datediff(
lead("record_day").over(Window.partitionBy("variable").orderBy("record_day")),
"record_day"),
lit(1))
)
.select(
# up-sampling part 2: just, pad the time axis as dictated by "gap", and the other two fields will be repeated
explode(pad_time("gap", "record_day")).alias("record_day"),
"variable",
"value",
)
.orderBy("record_day", "variable")
The results looks like that:
+-------------------+--------+-----+
| record_day|variable|value|
+-------------------+--------+-----+
|2021-10-01 00:00:00| A| 1.0|
|2021-10-01 00:00:00| B| 10.0|
|2021-10-01 00:00:00| C|100.0|
|2021-10-02 00:00:00| A| 3.0|
|2021-10-02 00:00:00| B| 10.0|
|2021-10-02 00:00:00| C|200.0|
|2021-10-03 00:00:00| A| 3.0|
|2021-10-03 00:00:00| B| 10.0|
|2021-10-03 00:00:00| C|200.0|
|2021-10-04 00:00:00| A| 3.0|
|2021-10-04 00:00:00| B| 46.0|
|2021-10-04 00:00:00| C|200.0|
|2021-10-05 00:00:00| A| 6.0|
|2021-10-05 00:00:00| B| 46.0|
|2021-10-05 00:00:00| C|200.0|
|2021-10-06 00:00:00| A| 6.0|
|2021-10-06 00:00:00| B| 46.0|
|2021-10-06 00:00:00| C|200.0|
|2021-10-07 00:00:00| A| 7.0|
|2021-10-07 00:00:00| B| 70.0|
|2021-10-07 00:00:00| C|700.0|
+-------------------+--------+-----+