PySpark: How to Append Dataframes in For Loop
Asked Answered
G

2

11

I am performing a rolling median calculation on individual time series dataframes, then I want to concat/append the results.

# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())

series_list = ['0620', '5914']
SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
        collect_list("metric").over(w)) \
        .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

SeriesAppend

[DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float], DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float]]

When I attempt to .show():

'list' object has no attribute 'show'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'show'

I realize this is saying the object is a list of dataframes. How do I convert to a single dataframe?

I know that the following solution works for an explicit number of dataframes, but I want my for-loop to be agnostic to the number of dataframes:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

Is there a way to generalize this to non-explicit dataframe names?

Graehme answered 29/5, 2019 at 15:2 Comment(6)
I guess you need union. Have a look at this answer, a method to union several dataframes from a list is explicitedEsra
union them all together. One way is to use functools.reduce and do the following: reduce(lambda a, b: a.union(b), SeriesAppend[1:], SeriesAppend[0])Hoplite
Possible duplicate of Spark unionAll multiple dataframes. Second answer is for pyspark.Hoplite
If you add "ID" into your window w as another partitionBy argument, you do not need to do the for loop and union at all. Just subset the dataframe into the ids you want test_df = test_df.where(col("ID").isin(series_list)) and you are good to go.Centaurus
Richard, that suggestion would work, but I will not know all my ID's. For instance, there will be somewhere around 30k series, but the exact N is not determined.Graehme
@Graehme what do you mean by explicit number of dataframes? the point of using reduce is to perform the function (here union) as many times as you need it. If you do df = reduce(DataFrame.unionAll, SeriesAppend) outside of the for loop, you don't need to specify the number of dataframe anywhere. Or there is something else I missed/don't understand?Esra
G
19

Thanks everyone! To sum up - the solution uses Reduce and unionAll:

from functools import reduce
from pyspark.sql import DataFrame

SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
         collect_list("metric").over(w)) \
         .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

df_series = reduce(DataFrame.unionAll, SeriesAppend)
Graehme answered 29/5, 2019 at 20:30 Comment(3)
You should add, in your answer, the lines from functools import reduce from pyspark.sql import DataFrame So people don't have to look further up.Vasty
@Vasty - Thanks, I've added the Import libraries to the solution.Graehme
Thanks. Your answer's been pretty useful for me.Vasty
C
2

Another option would be to union your dataframes as you loop through, rather than collect them in a list and union afterwards. You can achieve this by setting a unioned_df variable to 'None' before the loop, and on the first iteration of the loop, setting the unioned_df to the current dataframe. All subsequent iterations of the loop then are unioned to the (now existing) unioned_df.

# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())

series_list = ['0620', '5914']
unioned_df = None

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
        series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", collect_list("metric").over(w)) \
                                 .withColumn("rolling_median", median_udf("list"))

    # If unioned_df doesn't exist, create it using current iteration of series_sorted.
    # Otherwise append current iteration of series_sorted to the existing unioned_df.
    if not unioned_df:
        unioned_df = series_sorted
    else:
        unioned_df = unioned_df.union(series_sorted)
Cheng answered 15/8, 2022 at 10:31 Comment(1)
Thanks, I think this is the more straightforward answer and what I instinctively thought of, but @mwhee's answer above seems more elegant. Do you know which of the two is more "pyspark-y"?Padraig

© 2022 - 2024 — McMap. All rights reserved.