Pyspark : forward fill with last observation for a DataFrame
Asked Answered
R

5

28

Using Spark 1.5.1,

I've been trying to forward fill null values with the last known observation for one column of my DataFrame.

It is possible to start with a null value and for this case I would to backward fill this null value with the first knwn observation. However, If that too complicates the code, this point can be skipped.

In this post, a solution in Scala was provided for a very similar problem by zero323.

But, I don't know Scala and I don't succeed to ''translate'' it in Pyspark API code. It's possible to do it with Pyspark ?

Thanks for your help.

Below, a simple example sample input:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | null 
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | null   
| 1             | 2015-12-05 | null     
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | null
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | null      
| 2             | 2015-12-03 | null     
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | null   
| 2             | 2015-12-06 | U4

And the expected output:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | U1
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | U1
| 1             | 2015-12-05 | U1
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | U2
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | U1
| 2             | 2015-12-03 | U3
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | U3
| 2             | 2015-12-06 | U4
Rankin answered 15/3, 2016 at 18:59 Comment(3)
I am not if I get the logic. Relationship between user and cookie is many to many? Also how do you define the order? Order of rows is not particularly meaningless in Spark SQL (not that it is in any SQLish environment)Ultramicrochemistry
Sorry, I forgot to include the timestamp in my example (I edit it). I introduce the Cookie_ID variable in the example to show that I have to forward fill null value BY cookie. Thanks for your help.Rankin
Did you ever find a solution to this?Finstad
G
28

Another workaround to get this working, is to try something like this:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

window = (
    Window
    .partitionBy('cookie_id')
    .orderBy('Time')
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

final = (
    joined
    .withColumn('UserIDFilled', F.last('User_ID', ignorenulls=True).over(window))
)

So what this is doing is that it constructs your window based on the partition key and the order column. It also tells the window to look back all rows within the window up to the current row. Finally, at each row, you return the last value that is not null (which remember, according to your window, it includes your current row)

Gyimah answered 31/10, 2020 at 23:46 Comment(0)
A
6

Hope you find this forward fill function useful. It is written using native pyspark function. Neither udf nor rdd being used (both of them are very slow, especially UDF!).

Let's use example provided by @Sid.

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
] 

df = spark.createDataFrame(values, ['cookie_ID', 'Time', 'User_ID'])

enter image description here

Functions:

def cum_sum(df, sum_col , order_col, cum_sum_col_nm='cum_sum'):  
    '''Find cumulative sum of a column. 
    Parameters 
    -----------
    sum_col : String 
        Column to perform cumulative sum. 
    order_col : List 
        Column/columns to sort for cumulative sum. 
    cum_sum_col_nm : String
        The name of the resulting cum_sum column. 

    Return
    -------
    df : DataFrame
        Dataframe with additional "cum_sum_col_nm". 

    '''
    df = df.withColumn('tmp', lit('tmp')) 

    windowval = (Window.partitionBy('tmp') 
                 .orderBy(order_col)
                 .rangeBetween(Window.unboundedPreceding, 0)) 

    df = df.withColumn('cum_sum', sum(sum_col).over(windowval).alias('cumsum').cast(StringType()))
    df = df.drop('tmp') 
    return df   


def forward_fill(df, order_col, fill_col, fill_col_name=None):
    '''Forward fill a column by a column/set of columns (order_col).  
    Parameters:
    ------------
    df: Dataframe 
    order_col: String or List of string
    fill_col: String (Only work for a column for this version.) 

    Return:
    ---------
    df: Dataframe 
        Return df with the filled_cols. 
    '''

    # "value" and "constant" are tmp columns created ton enable forward fill. 
    df = df.withColumn('value', when(col(fill_col).isNull(), 0).otherwise(1))
    df = cum_sum(df, 'value', order_col).drop('value')  
    df = df.withColumn(fill_col, 
                when(col(fill_col).isNull(), 'constant').otherwise(col(fill_col))) 

    win = (Window.partitionBy('cum_sum') 
              .orderBy(order_col)) 

    if not fill_col_name:
        fill_col_name = 'ffill_{}'.format(fill_col)

    df = df.withColumn(fill_col_name, collect_list(fill_col).over(win)[0])
    df = df.drop('cum_sum')
    df = df.withColumn(fill_col_name, when(col(fill_col_name)=='constant', None).otherwise(col(fill_col_name)))
    df = df.withColumn(fill_col, when(col(fill_col)=='constant', None).otherwise(col(fill_col)))
    return df   

Let's see the results.

ffilled_df = forward_fill(df, 
                          order_col=['cookie_ID', 'Time'], 
                          fill_col='User_ID', 
                          fill_col_name = 'User_ID_ffil')
ffilled_df.sort(['cookie_ID', 'Time']).show()   

enter image description here

Autobiography answered 6/4, 2018 at 9:8 Comment(1)
here a solution without coding #38132482Houdan
A
5

The partitioned example code from Spark / Scala: forward fill with last observation in pyspark is shown. This only works for data that can be partitioned.

Load the data

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
]
rdd = sc.parallelize(values)
df = rdd.toDF(["cookie_id", "c_date", "user_id"])
df = df.withColumn("c_date", df.c_date.cast("date"))
df.show()

The DataFrame is

+---------+----------+-------+
|cookie_id|    c_date|user_id|
+---------+----------+-------+
|        1|2015-12-01|   null|
|        1|2015-12-02|     U1|
|        1|2015-12-02|     U1|
|        1|2015-12-03|     U2|
|        1|2015-12-04|   null|
|        1|2015-12-05|   null|
|        2|2015-12-04|   null|
|        2|2015-12-03|   null|
|        2|2015-12-02|     U3|
|        2|2015-12-05|   null|
+---------+----------+-------+

Column used to sort the partitions

# get the sort key
def getKey(item):
    return item.c_date

The fill function. Can be used to fill in multiple columns if necessary.

# fill function
def fill(x):
    out = []
    last_val = None
    for v in x:
        if v["user_id"] is None:
            data = [v["cookie_id"], v["c_date"], last_val]
        else:
            data = [v["cookie_id"], v["c_date"], v["user_id"]]
            last_val = v["user_id"]
        out.append(data)
    return out

Convert to rdd, partition, sort and fill the missing values

# Partition the data
rdd = df.rdd.groupBy(lambda x: x.cookie_id).mapValues(list)
# Sort the data by date
rdd = rdd.mapValues(lambda x: sorted(x, key=getKey))
# fill missing value and flatten
rdd = rdd.mapValues(fill).flatMapValues(lambda x: x)
# discard the key
rdd = rdd.map(lambda v: v[1])

Convert back to DataFrame

df_out = sqlContext.createDataFrame(rdd)
df_out.show()

The output is

+---+----------+----+
| _1|        _2|  _3|
+---+----------+----+
|  1|2015-12-01|null|
|  1|2015-12-02|  U1|
|  1|2015-12-02|  U1|
|  1|2015-12-03|  U2|
|  1|2015-12-04|  U2|
|  1|2015-12-05|  U2|
|  2|2015-12-02|  U3|
|  2|2015-12-03|  U3|
|  2|2015-12-04|  U3|
|  2|2015-12-05|  U3|
+---+----------+----+
Aurignacian answered 22/10, 2016 at 15:55 Comment(1)
here a solution without coding #38132482Houdan
C
4
// Forward filling
w1 = Window.partitionBy('cookie_id').orderBy('c_date').rowsBetween(Window.unboundedPreceding,0)
w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

//Backward filling
final_df = df.withColumn('UserIDFilled', F.coalesce(F.last('user_id', True).over(w1),
                                                    F.first('user_id',True).over(w2)))

final_df.orderBy('cookie_id', 'c_date').show(truncate=False)

   +---------+----------+-------+------------+
|cookie_id|c_date    |user_id|UserIDFilled|
+---------+----------+-------+------------+
|1        |2015-12-01|null   |U1          |
|1        |2015-12-02|U1     |U1          |
|1        |2015-12-02|U1     |U1          |
|1        |2015-12-03|U2     |U2          |
|1        |2015-12-04|null   |U2          |
|1        |2015-12-05|null   |U2          |
|2        |2015-12-02|U3     |U3          |
|2        |2015-12-03|null   |U3          |
|2        |2015-12-04|null   |U3          |
|2        |2015-12-05|null   |U3          |
+---------+----------+-------+------------+
Consul answered 30/11, 2020 at 8:46 Comment(0)
P
0

Cloudera has released a library called spark-ts that offers a suite of useful methods for processing time series and sequential data in Spark. This library supports a number of time-windowed methods for imputing data points based on other data in the sequence.

http://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series-data-with-apache-spark/

Pearlinepearlman answered 7/8, 2016 at 15:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.