In Spark 2.1+, to find median we can use functions percentile
and percentile_approx
. We can use them both in aggregations and with window functions. As you originally wanted, you can use rowsBetween()
too.
Examples using PySpark:
from pyspark.sql import SparkSession, functions as F, Window as W
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 10),
(1, 20),
(1, 30),
(1, 40),
(1, 50),
(2, 50)],
['c1', 'c2']
)
df = (
df
.withColumn(
'moving_median_1',
F.expr('percentile(c2, 0.5)').over(W.partitionBy('c1').orderBy('c2')))
.withColumn(
'moving_median_2',
F.expr('percentile(c2, 0.5) over(partition by c1 order by c2)'))
.withColumn(
'moving_median_3_rows_1',
F.expr('percentile(c2, 0.5)').over(W.partitionBy('c1').orderBy('c2').rowsBetween(-2, 0)))
.withColumn(
'moving_median_3_rows_2',
F.expr('percentile(c2, 0.5) over(partition by c1 order by c2 rows between 2 preceding and current row)'))
).show()
#+---+---+---------------+---------------+----------------------+----------------------+
#| c1| c2|moving_median_1|moving_median_2|moving_median_3_rows_1|moving_median_3_rows_2|
#+---+---+---------------+---------------+----------------------+----------------------+
#| 1| 10| 10.0| 10.0| 10.0| 10.0|
#| 1| 20| 15.0| 15.0| 15.0| 15.0|
#| 1| 30| 20.0| 20.0| 20.0| 20.0|
#| 1| 40| 25.0| 25.0| 30.0| 30.0|
#| 1| 50| 30.0| 30.0| 40.0| 40.0|
#| 2| 50| 50.0| 50.0| 50.0| 50.0|
#+---+---+---------------+---------------+----------------------+----------------------+