Is there an equivalent of Pandas Melt function in Apache Spark in PySpark or at least in Scala?
I was running a sample dataset till now in Python and now I want to use Spark for the entire dataset.
Is there an equivalent of Pandas Melt function in Apache Spark in PySpark or at least in Scala?
I was running a sample dataset till now in Python and now I want to use Spark for the entire dataset.
Spark >= 3.4
In Spark 3.4 or later you can use built-in melt
method
(sdf
.melt(
ids=['A'], values=['B', 'C'],
variableColumnName="variable",
valueColumnName="value")
.show())
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1|
| a| C| 2|
| b| B| 3|
| b| C| 4|
| c| B| 5|
| c| C| 6|
+---+--------+-----+
This method is available across all APIs so could be used in Scala
sdf.melt(Array($"A"), Array($"B", $"C"), "variable", "value")
or SQL
SELECT * FROM sdf UNPIVOT (val FOR col in (col_1, col_2))
Spark 3.2 (Python only, requires Pandas and pyarrow)
(sdf
.to_koalas()
.melt(id_vars=['A'], value_vars=['B', 'C'])
.to_spark()
.show())
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1|
| a| C| 2|
| b| B| 3|
| b| C| 4|
| c| B| 5|
| c| C| 6|
+---+--------+-----+
Spark < 3.2
There is no built-in function (if you work with SQL and Hive support enabled you can use stack
function, but it is not exposed in Spark and has no native implementation) but it is trivial to roll your own. Required imports:
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable
Example implementation:
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
And some tests (based on Pandas doctests):
import pandas as pd
pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
'B': {0: 1, 1: 3, 2: 5},
'C': {0: 2, 1: 4, 2: 6}})
pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
A variable value
0 a B 1
1 b B 3
2 c B 5
3 a C 2
4 b C 4
5 c C 6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1|
| a| C| 2|
| b| B| 3|
| b| C| 4|
| c| B| 5|
| c| C| 6|
+---+--------+-----+
Note: For use with legacy Python versions remove type annotations.
Related:
withColumn
call. More ref available here(#55782296) –
Nadabus stack
option? as in: df.selectExpr('col1', 'stack(2, "col2", col2, "col3", col3) as (cols, values)')
–
Tessie explode
works with columns containing lists but creating the array "_vars_and_vals" as a key-value pair array of structure and then using it in a withColumn statement within explode is a very interesting behavior. What @Wareroom said! –
Chantey Came across this question in my search for an implementation of melt
in Spark for Scala.
Posting my Scala port in case someone also stumbles upon this.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
*
* @param df the data frame to melt
*/
implicit class DataFrameFunctions(df: DataFrame) {
/** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
*
* melt is (kind of) the inverse of pivot
* melt is currently (02/2017) not implemented in spark
*
* @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
* @see this is a scala adaptation of https://mcmap.net/q/246111/-how-to-melt-spark-dataframe
*
* @todo method overloading for simple calling
*
* @param id_vars the columns to preserve
* @param value_vars the columns to melt
* @param var_name the name for the column holding the melted columns names
* @param value_name the name for the column holding the values of the melted columns
*
*/
def melt(
id_vars: Seq[String], value_vars: Seq[String],
var_name: String = "variable", value_name: String = "value") : DataFrame = {
// Create array<struct<variable: str, value: ...>>
val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)
// Add to the DataFrame and explode
val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}
return _tmp.select(cols: _*)
}
}
Since I'm am not that advanced considering Scala
, I'm sure there is room for improvement.
Any comments are welcome.
for-yield
constructions just to map
functions, for-example: { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}
is the same as List(var_name, value_name).map(x => col("_vars_and_vals")(x).alias(x))
and for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }
can be written so: value_vars.map(c => struct(lit(c).alias(var_name), col(c).alias(value_name)))
. for-yield is more general thing in scala than for-comprehension in python. –
Complected Voted for user6910411's answer. It works as expected, however, it cannot handle None values well. thus I refactored his melt function to the following:
from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable
from itertools import chain
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create map<key: value>
_vars_and_vals = create_map(
list(chain.from_iterable([
[lit(c), col(c)] for c in value_vars]
))
)
_tmp = df.select(*id_vars, explode(_vars_and_vals)) \
.withColumnRenamed('key', var_name) \
.withColumnRenamed('value', value_name)
return _tmp
Test is with the following dataframe:
import pandas as pd
pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
'B': {0: 1, 1: 3, 2: 5},
'C': {0: 2, 1: 4, 2: 6},
'D': {1: 7, 2: 9}})
pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C', 'D'])
A variable value
0 a B 1.0
1 b B 3.0
2 c B 5.0
3 a C 2.0
4 b C 4.0
5 c C 6.0
6 a D NaN
7 b D 7.0
8 c D 9.0
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C', 'D']).show()
+---+--------+-----+
| A|variable|value|
+---+--------+-----+
| a| B| 1.0|
| a| C| 2.0|
| a| D| NaN|
| b| B| 3.0|
| b| C| 4.0|
| b| D| 7.0|
| c| B| 5.0|
| c| C| 6.0|
| c| D| 9.0|
+---+--------+-----+
df_long = melt(df_wide, id_vars=['id', 'date'], value_vars=['t1', 't2', 't3', 't4'])
–
Chavers Finally i've found most effective implementation for me. It uses all resources for cluster in my yarn configuration.
from pyspark.sql.functions import explode
def melt(df):
sp = df.columns[1:]
return (df
.rdd
.map(lambda x: [str(x[0]), [(str(i[0]),
float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]],
preservesPartitioning = True)
.toDF()
.withColumn('_2', explode('_2'))
.rdd.map(lambda x: [str(x[0]),
str(x[1][0]),
float(x[1][1] if x[1][1] else 0)],
preservesPartitioning = True)
.toDF()
)
For very wide dataframe I've got performance decreasing at _vars_and_vals generation from user6910411 answer.
It was useful to implement melting via selectExpr
columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
| a| b| c| d| e| f|
+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6|
| 4| 5| 6| 7| 9| 8|
| 7| 8| 9| 1| 2| 4|
| 8| 3| 9| 8| 7| 4|
+---+---+---+---+---+---+
cols = df.columns[1:]
df.selectExpr('a', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
+---+----+----+
| a|col0|col1|
+---+----+----+
| 1| b| 2|
| 1| c| 3|
| 1| d| 4|
| 1| e| 5|
| 1| f| 6|
| 4| b| 5|
| 4| c| 6|
| 4| d| 7|
| 4| e| 9|
| 4| f| 8|
| 7| b| 8|
| 7| c| 9|
...
Use list comprehension to create struct column of column names and col values and explode the new column using the magic inline. Code below;
melted_df=(df.withColumn(
#Create struct of column names and corresponding values
'tab',F.array(*[F.struct(lit(x).alias('var'),F.col(x).alias('val'))for x in df.columns if x!='A'] ))
#Explode the column
.selectExpr('A',"inline(tab)")
)
melted_df.show()
+---+---+---+
| A|var|val|
+---+---+---+
| a| B| 1|
| a| C| 2|
| b| B| 3|
| b| C| 4|
| c| B| 5|
| c| C| 6|
+---+---+---+
1)
Copy & paste
2)
Change the first 2 variables
to_melt = {'latin', 'greek', 'chinese'}
new_names = ['lang', 'letter']
melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
*(set(df.columns) - to_melt),
F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
)
null
is created if some values contain null. To remove it, add this:
.filter(f"!{new_names[1]} is null")
Full test:
from pyspark.sql import functions as F
df = spark.createDataFrame([(101, "A", "Σ", "西"), (102, "B", "Ω", "诶")], ['ID', 'latin', 'greek', 'chinese'])
df.show()
# +---+-----+-----+-------+
# | ID|latin|greek|chinese|
# +---+-----+-----+-------+
# |101| A| Σ| 西|
# |102| B| Ω| 诶|
# +---+-----+-----+-------+
to_melt = {'latin', 'greek', 'chinese'}
new_names = ['lang', 'letter']
melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
*(set(df.columns) - to_melt),
F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
)
df.show()
# +---+-------+------+
# | ID| lang|letter|
# +---+-------+------+
# |101| latin| A|
# |101| greek| Σ|
# |101|chinese| 西|
# |102| latin| B|
# |102| greek| Ω|
# |102|chinese| 诶|
# +---+-------+------+
© 2022 - 2024 — McMap. All rights reserved.