PySpark Dataframe melt columns into rows
Asked Answered
C

5

7

As the subject describes, I have a PySpark Dataframe that I need to melt three columns into rows. Each column essentially represents a single fact in a category. The ultimate goal is to aggregate the data into a single total per category.

There are tens of millions of rows in this dataframe, so I need a way to do the transformation on the spark cluster without bringing back any data to the driver (Jupyter in this case).

Here is an extract of my dataframe for just a few stores: +-----------+----------------+-----------------+----------------+ | store_id |qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs| +-----------+----------------+-----------------+----------------+ | 100| 30| 105| 35| | 200| 55| 85| 65| | 300| 20| 125| 90| +-----------+----------------+-----------------+----------------+

Here is the desired resulting dataframe, multiple rows per store, where the columns of the original dataframe have been melted into rows of the new dataframe, with one row per original column in a new category column: +-----------+--------+-----------+ | product_id|CATEGORY|qty_on_hand| +-----------+--------+-----------+ | 100| milk| 30| | 100| bread| 105| | 100| eggs| 35| | 200| milk| 55| | 200| bread| 85| | 200| eggs| 65| | 300| milk| 20| | 300| bread| 125| | 300| eggs| 90| +-----------+--------+-----------+

Ultimately, I want to aggregate the resulting dataframe to get the totals per category: +--------+-----------------+ |CATEGORY|total_qty_on_hand| +--------+-----------------+ | milk| 105| | bread| 315| | eggs| 190| +--------+-----------------+

UPDATE: There is a suggestion that this question is a duplicate and can be answered here. This is not the case, as the solution casts rows to columns and I need to do the reverse, melt columns into rows.

Create answered 27/3, 2019 at 13:10 Comment(1)
Possible duplicate of How to melt Spark DataFrame?Pedraza
L
8

We can use explode() function to solve this issue. In Python, the same thing can be done with melt

# Loading the requisite packages 
from pyspark.sql.functions import col, explode, array, struct, expr, sum, lit        
# Creating the DataFrame
df = sqlContext.createDataFrame([(100,30,105,35),(200,55,85,65),(300,20,125,90)],('store_id','qty_on_hand_milk','qty_on_hand_bread','qty_on_hand_eggs'))
df.show()
+--------+----------------+-----------------+----------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

Writing the function below, which shall explode this DataFrame:

def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("CATEGORY"), col(c).alias("qty_on_hand")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.CATEGORY", "kvs.qty_on_hand"])

Applying the function on this DataFrame to explode it-

df = to_explode(df, ['store_id'])\
     .drop('store_id')
df.show()
+-----------------+-----------+
|         CATEGORY|qty_on_hand|
+-----------------+-----------+
| qty_on_hand_milk|         30|
|qty_on_hand_bread|        105|
| qty_on_hand_eggs|         35|
| qty_on_hand_milk|         55|
|qty_on_hand_bread|         85|
| qty_on_hand_eggs|         65|
| qty_on_hand_milk|         20|
|qty_on_hand_bread|        125|
| qty_on_hand_eggs|         90|
+-----------------+-----------+

Now, we need to remove the string qty_on_hand_ from CATEGORY column. It can be done using expr() function. Note expr follows 1 based indexing for the substring, as opposed to 0 -

df = df.withColumn('CATEGORY',expr('substring(CATEGORY, 13)'))
df.show()
+--------+-----------+
|CATEGORY|qty_on_hand|
+--------+-----------+
|    milk|         30|
|   bread|        105|
|    eggs|         35|
|    milk|         55|
|   bread|         85|
|    eggs|         65|
|    milk|         20|
|   bread|        125|
|    eggs|         90|
+--------+-----------+

Finally, aggregating the column qty_on_hand grouped by CATEGORY using agg() function -

df = df.groupBy(['CATEGORY']).agg(sum('qty_on_hand').alias('total_qty_on_hand'))
df.show()
+--------+-----------------+
|CATEGORY|total_qty_on_hand|
+--------+-----------------+
|    eggs|              190|
|   bread|              315|
|    milk|              105|
+--------+-----------------+
Lambent answered 27/3, 2019 at 19:3 Comment(2)
Thanks @cph_sto. I have confirmed this works. Is there any way to remove the restriction that all the columns MUST have the same data types?Create
No, not that I am aware of.Lambent
R
8

I think you should use array and explode to do this, you do not need any complex logic with UDFs or custom functions.

array will combine columns into a single column, or annotate columns.

explode will convert an array column into a set of rows.

All you need to do is:

  • annotate each column with you custom label (eg. 'milk')
  • combine your labelled columns into a single column of 'array' type
  • explode the labels column to generate labelled rows
  • drop irrelevant columns
df = (
    df.withColumn('labels', F.explode(                         # <-- Split into rows
        F.array(                                               # <-- Combine columns
            F.array(F.lit('milk'), F.col('qty_on_hand_milk')), # <-- Annotate column
            F.array(F.lit('bread'), F.col('qty_on_hand_bread')),
            F.array(F.lit('eggs'), F.col('qty_on_hand_eggs')),
        )
    ))
    .withColumn('CATEGORY', F.col('labels')[0])
    .withColumn('qty_on_hand', F.col('labels')[1])
).select('store_id', 'CATEGORY', 'qty_on_hand')

Note how you can pull out elements of an array column simply using col('foo')[INDEX]; there is no specific need to split them out into separate columns.

This approach is also robust over different data types, because it doesn't try to force the same schema over every row (unlike using a struct).

eg. If 'qty_on_hand_bread' is a string, this still works, the resulting schema will just be:

root
 |-- store_id: long (nullable = false)
 |-- CATEGORY: string (nullable = true)
 |-- qty_on_hand: string (nullable = true) <-- Picks best schema on the fly

Here is the same code, step by step to make it obvious what's happening here:

import databricks.koalas as ks
import pyspark.sql.functions as F

# You don't need koalas, it's just less verbose for adhoc dataframes
df = ks.DataFrame({
    "store_id": [100, 200, 300],
    "qty_on_hand_milk": [30, 55, 20],
    "qty_on_hand_bread": [105, 85, 125],
    "qty_on_hand_eggs": [35, 65, 90],
}).to_spark()
df.show()

# Annotate each column with your custom label per row. ie. v -> ['label', v]
df = df.withColumn('label1', F.array(F.lit('milk'), F.col('qty_on_hand_milk')))
df = df.withColumn('label2', F.array(F.lit('bread'), F.col('qty_on_hand_bread')))
df = df.withColumn('label3', F.array(F.lit('eggs'), F.col('qty_on_hand_eggs')))
df.show()

# Create a new column which combines the labeled values in a single column
df = df.withColumn('labels', F.array('label1', 'label2', 'label3'))
df.show()

# Split into individual rows
df = df.withColumn('labels', F.explode('labels'))
df.show()

# You can now do whatever you want with your labelled rows, eg. split them into new columns
df = df.withColumn('CATEGORY', F.col('labels')[0])
df = df.withColumn('qty_on_hand', F.col('labels')[1])
df.show()

...and the output from each step:

|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

+--------+----------------+-----------------+----------------+----------+------------+----------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|
+--------+----------------+-----------------+----------------+----------+------------+----------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|
+--------+----------------+-----------------+----------------+----------+------------+----------+

+--------+----------------+-----------------+----------------+----------+------------+----------+--------------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|              labels|
+--------+----------------+-----------------+----------------+----------+------------+----------+--------------------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|[[milk, 30], [bre...|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|[[milk, 55], [bre...|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|[[milk, 20], [bre...|
+--------+----------------+-----------------+----------------+----------+------------+----------+--------------------+

+--------+----------------+-----------------+----------------+----------+------------+----------+------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|      labels|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [milk, 30]|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|[bread, 105]|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [eggs, 35]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [milk, 55]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]| [bread, 85]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [eggs, 65]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [milk, 20]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|[bread, 125]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [eggs, 90]|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+

+--------+----------------+-----------------+----------------+----------+------------+----------+------------+--------+-----------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|      labels|CATEGORY|qty_on_hand|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+--------+-----------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [milk, 30]|    milk|         30|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|[bread, 105]|   bread|        105|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [eggs, 35]|    eggs|         35|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [milk, 55]|    milk|         55|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]| [bread, 85]|   bread|         85|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [eggs, 65]|    eggs|         65|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [milk, 20]|    milk|         20|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|[bread, 125]|   bread|        125|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [eggs, 90]|    eggs|         90|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+--------+-----------+

+--------+--------+-----------+
|store_id|CATEGORY|qty_on_hand|
+--------+--------+-----------+
|     100|    milk|         30|
|     100|   bread|        105|
|     100|    eggs|         35|
|     200|    milk|         55|
|     200|   bread|         85|
|     200|    eggs|         65|
|     300|    milk|         20|
|     300|   bread|        125|
|     300|    eggs|         90|
+--------+--------+-----------+
Reber answered 30/9, 2020 at 3:42 Comment(1)
F.array() should use columns with the same type, no? See hereIsidore
C
0

A possible way of doing this using - col,when, functions modules of pyspark

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import StringType
>>> concat_udf = F.udf(lambda cols: "".join([str(x) if x is not None else "*" for x in cols]), StringType())

>>> rdd = sc.parallelize([[100,30,105,35],[200,55,85,65],[300,20,125,90]])
>>> df = rdd.toDF(['store_id','qty_on_hand_milk','qty_on_hand_bread','qty_on_hand_eggs'])

>>> df.show()
+--------+----------------+-----------------+----------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

#adding one more column with arrayed values of all three columns
>>> df_1=df.withColumn("new_col", concat_udf(F.array("qty_on_hand_milk", "qty_on_hand_bread","qty_on_hand_eggs")))
#convert it into array<int> for carrying out agg operations
>>> df_2=df_1.withColumn("new_col_1",split(col("new_col"), ",\s*").cast("array<int>").alias("new_col_1"))
#posexplode gives you the position along with usual explode which helps in categorizing
>>> df_3=df_2.select("store_id",  posexplode("new_col_1").alias("col_1","qty"))
#if else conditioning for category column
>>> df_3.withColumn("category",F.when(col("col_1") == 0, "milk").when(col("col_1") == 1, "bread").otherwise("eggs")).select("store_id","category","qty").show()
+--------+--------+---+
|store_id|category|qty|
+--------+--------+---+
|     100|    milk| 30|
|     100|   bread|105|
|     100|    eggs| 35|
|     200|    milk| 55|
|     200|   bread| 85|
|     200|    eggs| 65|
|     300|    milk| 20|
|     300|   bread|125|
|     300|    eggs| 90|
+--------+--------+---+

#aggregating to find sum
>>> df_3.withColumn("category",F.when(col("col_1") == 0, "milk").when(col("col_1") == 1, "bread").otherwise("eggs")).select("category","qty").groupBy('category').sum().show()
+--------+--------+
|category|sum(qty)|
+--------+--------+
|    eggs|     190|
|   bread|     315|
|    milk|     105|
+--------+--------+
>>> df_3.printSchema()
root
 |-- store_id: long (nullable = true)
 |-- col_1: integer (nullable = false)
 |-- qty: integer (nullable = true)
Certificate answered 27/3, 2019 at 15:25 Comment(3)
Thanks @Jim Todd. Do the columns all have to be the same data type?Create
Also, I'm tring this solution but getting an error: NameError: name 'concat_udf' is not definedCreate
Added the code for concat_udf which I missed previously. Also, you can see all the columns in the final dataframe are of some integer data type. This will be useful as you are aggregating at last.Certificate
D
0

Here is a function that implements it

from pyspark.sql import functions as F

def melt(df,cols,alias=('key','value')):
  other = [col for col in df.columns if col not in cols]
  for c in cols:
    df = df.withColumn(c, F.expr(f'map("{c}", cast({c} as double))'))
  df = df.withColumn('melted_cols', F.map_concat(*cols))
  return df.select(*other,F.explode('melted_cols').alias(*alias))
Dilative answered 17/2, 2022 at 17:29 Comment(0)
S
0

Belated answer. List comprehension and the inline function can melt the df.

    (#Create struct of the column names and col values
  df.withColumn('tab',F.array(*[F.struct(F.lit(k.replace('qty_on_hand_','')).alias('CATEGORY'), F.col(k).alias('qty_on_hand')) for k in df.columns if k!='store_id']))
 #Explode using the inline function
  .selectExpr('store_id as product_id',"inline(tab)")
  #groupby and sum
  .groupby('CATEGORY').agg(sum('qty_on_hand').alias('total_qty_on_hand'))
).show()


+--------+-----------------+
|CATEGORY|total_qty_on_hand|
+--------+-----------------+
|    eggs|              190|
|   bread|              315|
|    milk|              105|
+--------+-----------------+
Slew answered 17/8, 2022 at 13:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.