How to melt Spark DataFrame?
Asked Answered
H

6

67

Is there an equivalent of Pandas Melt function in Apache Spark in PySpark or at least in Scala?

I was running a sample dataset till now in Python and now I want to use Spark for the entire dataset.

Heriberto answered 16/1, 2017 at 5:42 Comment(1)
See also unpivot in spark-sql/pyspark and Transpose column to row with SparkCopier
P
121

Spark >= 3.4

In Spark 3.4 or later you can use built-in melt method

(sdf
    .melt(
        ids=['A'], values=['B', 'C'], 
        variableColumnName="variable", 
        valueColumnName="value")
    .show())
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

This method is available across all APIs so could be used in Scala

sdf.melt(Array($"A"), Array($"B", $"C"), "variable", "value")

or SQL

SELECT * FROM sdf UNPIVOT (val FOR col in (col_1, col_2))

Spark 3.2 (Python only, requires Pandas and pyarrow)

(sdf
    .to_koalas()
    .melt(id_vars=['A'], value_vars=['B', 'C'])
    .to_spark()
    .show())
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

Spark < 3.2

There is no built-in function (if you work with SQL and Hive support enabled you can use stack function, but it is not exposed in Spark and has no native implementation) but it is trivial to roll your own. Required imports:

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

Example implementation:

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

And some tests (based on Pandas doctests):

import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

Note: For use with legacy Python versions remove type annotations.

Related:

Perbunan answered 16/1, 2017 at 9:56 Comment(6)
Your code adds back ticks to the column names and then it fails on withColumn call. More ref available here(#55782296)Nadabus
How is this benchmarked in comparison to the stack option? as in: df.selectExpr('col1', 'stack(2, "col2", col2, "col3", col3) as (cols, values)')Tessie
This isn't a trivial answer. This is a genius one!Wareroom
Amazing answer. I've used this function many times without any problem.Building
This is truly amazing. explode works with columns containing lists but creating the array "_vars_and_vals" as a key-value pair array of structure and then using it in a withColumn statement within explode is a very interesting behavior. What @Wareroom said!Chantey
Spark 3.4 isn't release yet (is.spark.released.info) as of writing this comment. Is there any official announcement that you can point me to?Bison
C
35

Came across this question in my search for an implementation of melt in Spark for Scala.

Posting my Scala port in case someone also stumbles upon this.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
 *
 *  @param df the data frame to melt
 */
implicit class DataFrameFunctions(df: DataFrame) {

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
     * 
     *  melt is (kind of) the inverse of pivot
     *  melt is currently (02/2017) not implemented in spark
     *
     *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
     *  @see this is a scala adaptation of https://mcmap.net/q/246111/-how-to-melt-spark-dataframe
     *  
     *  @todo method overloading for simple calling
     *
     *  @param id_vars the columns to preserve
     *  @param value_vars the columns to melt
     *  @param var_name the name for the column holding the melted columns names
     *  @param value_name the name for the column holding the values of the melted columns
     *
     */

    def melt(
            id_vars: Seq[String], value_vars: Seq[String], 
            var_name: String = "variable", value_name: String = "value") : DataFrame = {

        // Create array<struct<variable: str, value: ...>>
        val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)

        // Add to the DataFrame and explode
        val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

        val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}

        return _tmp.select(cols: _*)

    }
}

Since I'm am not that advanced considering Scala, I'm sure there is room for improvement.

Any comments are welcome.

Crisscross answered 22/2, 2017 at 8:36 Comment(1)
Your code is okay but I would advice replace for-yield constructions just to map functions, for-example: { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }} is the same as List(var_name, value_name).map(x => col("_vars_and_vals")(x).alias(x)) and for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) } can be written so: value_vars.map(c => struct(lit(c).alias(var_name), col(c).alias(value_name))). for-yield is more general thing in scala than for-comprehension in python.Complected
L
8

Voted for user6910411's answer. It works as expected, however, it cannot handle None values well. thus I refactored his melt function to the following:

from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable 
from itertools import chain

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create map<key: value>
    _vars_and_vals = create_map(
        list(chain.from_iterable([
            [lit(c), col(c)] for c in value_vars]
        ))
    )

    _tmp = df.select(*id_vars, explode(_vars_and_vals)) \
        .withColumnRenamed('key', var_name) \
        .withColumnRenamed('value', value_name)

    return _tmp

Test is with the following dataframe:

import pandas as pd

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6},
                   'D': {1: 7, 2: 9}})

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C', 'D'])

A   variable    value
0   a   B   1.0
1   b   B   3.0
2   c   B   5.0
3   a   C   2.0
4   b   C   4.0
5   c   C   6.0
6   a   D   NaN
7   b   D   7.0
8   c   D   9.0

sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C', 'D']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|  1.0|
|  a|       C|  2.0|
|  a|       D|  NaN|
|  b|       B|  3.0|
|  b|       C|  4.0|
|  b|       D|  7.0|
|  c|       B|  5.0|
|  c|       C|  6.0|
|  c|       D|  9.0|
+---+--------+-----+
Lyudmila answered 25/6, 2019 at 11:18 Comment(2)
How would this work if I am trying to pass in a list i.e 'someColumns' for the value_vars? I'm getting an 'Unsupported literal type class' error.Cookhouse
It works for me perfectly, nice job! @Budyn: what exactly are you passing into the value_vars parameter? I pass a list of strings (of column names), like this: df_long = melt(df_wide, id_vars=['id', 'date'], value_vars=['t1', 't2', 't3', 't4'])Chavers
S
4

UPD

Finally i've found most effective implementation for me. It uses all resources for cluster in my yarn configuration.

from pyspark.sql.functions import explode
def melt(df):
    sp = df.columns[1:]
    return (df
            .rdd
            .map(lambda x: [str(x[0]), [(str(i[0]), 
                                         float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]], 
                 preservesPartitioning = True)
            .toDF()
            .withColumn('_2', explode('_2'))
            .rdd.map(lambda x: [str(x[0]), 
                                str(x[1][0]), 
                                float(x[1][1] if x[1][1] else 0)], 
                     preservesPartitioning = True)
            .toDF()
            )

For very wide dataframe I've got performance decreasing at _vars_and_vals generation from user6910411 answer.

It was useful to implement melting via selectExpr

columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  4|  5|  6|  7|  9|  8|
|  7|  8|  9|  1|  2|  4|
|  8|  3|  9|  8|  7|  4|
+---+---+---+---+---+---+

cols = df.columns[1:]
df.selectExpr('a', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
+---+----+----+
|  a|col0|col1|
+---+----+----+
|  1|   b|   2|
|  1|   c|   3|
|  1|   d|   4|
|  1|   e|   5|
|  1|   f|   6|
|  4|   b|   5|
|  4|   c|   6|
|  4|   d|   7|
|  4|   e|   9|
|  4|   f|   8|
|  7|   b|   8|
|  7|   c|   9|
...
Sought answered 13/2, 2019 at 14:39 Comment(2)
I am having some type mismatch cannot resolve.. due to data type mismatch: Argument 2 (DoubleType) != Argument 6 (LongType); line 1 pos 0; . Testing shows that it seem stack imply the type of your col1 based on the first few elements of col0 . When let's say values for d or f of col0 come in, type mismatch. How would you solve that ? I am trying stack({}, {})".format(len(cols), ', '.join(("'{}', cast({} as bigint)"... which seems to work, but not sure if it's the correct and efficient way. I am having performance issue when stacking hundreds of columns so efficiency is important.Sigh
@Sigh I've never met such problem in this case. But you solution sounds logical. Also you can try my solution from update.Sought
E
-1

Use list comprehension to create struct column of column names and col values and explode the new column using the magic inline. Code below;

    melted_df=(df.withColumn(
                 #Create struct of column names and corresponding values
                'tab',F.array(*[F.struct(lit(x).alias('var'),F.col(x).alias('val'))for x in df.columns if x!='A'] ))
                 #Explode the column
                 .selectExpr('A',"inline(tab)")
          
)
  
melted_df.show()

+---+---+---+
|  A|var|val|
+---+---+---+
|  a|  B|  1|
|  a|  C|  2|
|  b|  B|  3|
|  b|  C|  4|
|  c|  B|  5|
|  c|  C|  6|
+---+---+---+
Estrade answered 17/8, 2022 at 13:48 Comment(0)
F
-1

1) Copy & paste
2) Change the first 2 variables

to_melt = {'latin', 'greek', 'chinese'}
new_names = ['lang', 'letter']

melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
)

null is created if some values contain null. To remove it, add this:

.filter(f"!{new_names[1]} is null")

Full test:

from pyspark.sql import functions as F
df = spark.createDataFrame([(101, "A", "Σ", "西"), (102, "B", "Ω", "诶")], ['ID', 'latin', 'greek', 'chinese'])
df.show()
# +---+-----+-----+-------+
# | ID|latin|greek|chinese|
# +---+-----+-----+-------+
# |101|    A|    Σ|     西|
# |102|    B|    Ω|     诶|
# +---+-----+-----+-------+

to_melt = {'latin', 'greek', 'chinese'}
new_names = ['lang', 'letter']

melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
)

df.show()
# +---+-------+------+
# | ID|   lang|letter|
# +---+-------+------+
# |101|  latin|     A|
# |101|  greek|     Σ|
# |101|chinese|    西|
# |102|  latin|     B|
# |102|  greek|     Ω|
# |102|chinese|    诶|
# +---+-------+------+
Fiction answered 11/10, 2022 at 11:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.