Forward fill missing values in Spark/Python
Asked Answered
D

1

8

I am attempting to fill in missing values in my Spark dataframe with the previous non-null value (if it exists). I've done this type of thing in Python/Pandas but my data is too big for Pandas (on a small cluster) and I'm Spark noob. Is this something Spark can do? Can it do it for multiple columns? If so, how? If not, any suggestions for alternative approaches within the who Hadoop suite of tools?

Thanks!

Datary answered 30/6, 2016 at 19:46 Comment(6)
Looks like this has been asked before, without much success.Tati
@Tati - yeah, I've noticed :/ I would think that this would be possible though.Datary
I believe it's possible using Window, but I'm actually working my way through that conceptually right now. Although if your data is large enough to need a cluster, why impute these instead of dropping the observations? Keep in mind when you impute that you're making up data that doesn't exist - it has its uses, but you should still avoid it if you can.Blayze
It looks like you can do it if you convert to an RDD first, then back to a dataframe: #33621819Blayze
@JeffL. - In this project I'm going to have to forward fill because while the data does not exist for those date/time in the dataset, it is assumed in this problem that values are repeated until the value changes. The link you sent is interesting....might have to learn Scala first :SDatary
Yeah, I don't know Scala either. The guy who posted the answer though, @zero323, is very active on Spark questions, so he might have input here eventually.Blayze
C
13

I've found a solution that works without additional coding by using a Window here. So Jeff was right, there is a solution. full code boelow, I'll briefly explain what it does, for more details just look at the blog.

from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

# define the window
window = Window.orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column_temperature = last(df6['temperature'], ignorenulls=True).over(window)

# do the fill 
spark_df_filled = df6.withColumn('temperature_filled',  filled_column_temperature)

So the idea is to define a Window sliding (more on sliding windows here) through the data which always contains the actual row and ALL previous ones:

    window = Window.orderBy('time')\
           .rowsBetween(-sys.maxsize, 0)

Note that we sort by time, so data is in the correct order. Also note that using "-sys.maxsize" ensures that the window is always including all previous data and is contineously growing as it traverses through the data top-down, but there might be more efficient solutions.

Using the "last" function, we are always addressing the last row in that window. By passing "ignorenulls=True" we define that if the current row is null, then the function will return the most recent (last) non-null value in the window. Otherwise the actual row's value is used.

Done.

Christner answered 19/5, 2018 at 5:51 Comment(5)
Better to use Window.unboundedPreceding instead of -sys.maxsize spark.apache.org/docs/latest/api/python/…Moten
this solution works well however when trying to persist the data I get the following error at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200) at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200) at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200) anyone have a work around?Murdoch
@Romeo Kienzler #56369247 if you want to take a lookMurdoch
This will not work without using the partitionBy as it will pick up the last value of the column.Pomposity
Note that this would work only if each row is regularly spaced. The anti-example would be: df = spark.createDataFrame([ ('mkt', 1, 1), ('mkt', 2, 2), ('mkt', 4, 4), ('mkt', 5, 5), ('mkt', 8, 8) ], schema=['mkt', 'days', 'delta'])Borrowing

© 2022 - 2024 — McMap. All rights reserved.