I am performing a rolling median calculation on individual time series dataframes, then I want to concat/append the results.
# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())
series_list = ['0620', '5914']
SeriesAppend=[]
for item in series_list:
# Filter for select item
series = test_df.where(col("ID").isin([item]))
# Sort time series
series_sorted = series.sort(series.ID,
series.date).persist()
# Calculate rolling median
series_sorted = series_sorted.withColumn("list",
collect_list("metric").over(w)) \
.withColumn("rolling_median", median_udf("list"))
SeriesAppend.append(series_sorted)
SeriesAppend
[DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float], DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float]]
When I attempt to .show():
'list' object has no attribute 'show'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'show'
I realize this is saying the object is a list of dataframes. How do I convert to a single dataframe?
I know that the following solution works for an explicit number of dataframes, but I want my for-loop to be agnostic to the number of dataframes:
from functools import reduce
from pyspark.sql import DataFrame
dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)
Is there a way to generalize this to non-explicit dataframe names?
union
. Have a look at this answer, a method to union several dataframes from a list is explicited – Esrafunctools.reduce
and do the following:reduce(lambda a, b: a.union(b), SeriesAppend[1:], SeriesAppend[0])
– Hoplite"ID"
into your windoww
as another partitionBy argument, you do not need to do the for loop and union at all. Just subset the dataframe into the ids you wanttest_df = test_df.where(col("ID").isin(series_list))
and you are good to go. – Centaurusreduce
is to perform the function (here union) as many times as you need it. If you dodf = reduce(DataFrame.unionAll, SeriesAppend)
outside of thefor
loop, you don't need to specify the number of dataframe anywhere. Or there is something else I missed/don't understand? – Esra