How to explode multiple columns of a dataframe in pyspark
Asked Answered
J

8

47

I have a dataframe which consists lists in columns similar to the following. The length of the lists in all columns is not same.

Name  Age  Subjects                  Grades
[Bob] [16] [Maths,Physics,Chemistry] [A,B,C]

I want to explode the dataframe in such a way that i get the following output-

Name Age Subjects Grades
Bob  16   Maths     A
Bob  16  Physics    B
Bob  16  Chemistry  C

How can I achieve this?

Jobi answered 28/6, 2018 at 12:19 Comment(2)
You want to match the index in a given array with other arrays in the row? Like how Maths -> A, Physics -> B, and Chemistry -> C. So something like Maths -> B would be wrong.Laughter
Yes @Laughter that would be wrongJobi
S
19

This works,

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])
df.show()

+-----+----+--------------------+---------+
| Name| Age|            Subjects|   Grades|
+-----+----+--------------------+---------+
|[Bob]|[16]|[Maths, Physics, ...|[A, B, C]|
+-----+----+--------------------+---------+

Use udf with zip. Those columns needed to explode have to be merged before exploding.

combine = F.udf(lambda x, y: list(zip(x, y)),
              ArrayType(StructType([StructField("subs", StringType()),
                                    StructField("grades", StringType())])))

df = df.withColumn("new", combine("Subjects", "Grades"))\
       .withColumn("new", F.explode("new"))\
       .select("Name", "Age", F.col("new.subs").alias("Subjects"), F.col("new.grades").alias("Grades"))
df.show()


+-----+----+---------+------+
| Name| Age| Subjects|Grades|
+-----+----+---------+------+
|[Bob]|[16]|    Maths|     A|
|[Bob]|[16]|  Physics|     B|
|[Bob]|[16]|Chemistry|     C|
+-----+----+---------+------+
Subkingdom answered 28/6, 2018 at 14:14 Comment(2)
what should I do if I need to put A B and C in different columns rather than rowsRough
UDFs are not the efficient and performant. They should be avoided if a pyspark API solution exists.Gemmiparous
B
73

PySpark has added an arrays_zip function in 2.4, which eliminates the need for a Python UDF to zip the arrays.

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])
df = df.withColumn("new", F.arrays_zip("Subjects", "Grades"))\
       .withColumn("new", F.explode("new"))\
       .select("Name", "Age", F.col("new.Subjects").alias("Subjects"), F.col("new.Grades").alias("Grades"))
df.show()

+-----+----+---------+------+
| Name| Age| Subjects|Grades|
+-----+----+---------+------+
|[Bob]|[16]|    Maths|     A|
|[Bob]|[16]|  Physics|     B|
|[Bob]|[16]|Chemistry|     C|
+-----+----+---------+------+
Burnight answered 3/9, 2019 at 9:55 Comment(0)
S
19

This works,

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])
df.show()

+-----+----+--------------------+---------+
| Name| Age|            Subjects|   Grades|
+-----+----+--------------------+---------+
|[Bob]|[16]|[Maths, Physics, ...|[A, B, C]|
+-----+----+--------------------+---------+

Use udf with zip. Those columns needed to explode have to be merged before exploding.

combine = F.udf(lambda x, y: list(zip(x, y)),
              ArrayType(StructType([StructField("subs", StringType()),
                                    StructField("grades", StringType())])))

df = df.withColumn("new", combine("Subjects", "Grades"))\
       .withColumn("new", F.explode("new"))\
       .select("Name", "Age", F.col("new.subs").alias("Subjects"), F.col("new.grades").alias("Grades"))
df.show()


+-----+----+---------+------+
| Name| Age| Subjects|Grades|
+-----+----+---------+------+
|[Bob]|[16]|    Maths|     A|
|[Bob]|[16]|  Physics|     B|
|[Bob]|[16]|Chemistry|     C|
+-----+----+---------+------+
Subkingdom answered 28/6, 2018 at 14:14 Comment(2)
what should I do if I need to put A B and C in different columns rather than rowsRough
UDFs are not the efficient and performant. They should be avoided if a pyspark API solution exists.Gemmiparous
R
14

Arriving late to the party :-)

The simplest way to go is by using inline that doesn't have python API but is supported by selectExpr.

df.selectExpr('Name[0] as Name','Age[0] as Age','inline(arrays_zip(Subjects,Grades))').show()

+----+---+---------+------+
|Name|Age| Subjects|Grades|
+----+---+---------+------+
| Bob| 16|    Maths|     A|
| Bob| 16|  Physics|     B|
| Bob| 16|Chemistry|     C|
+----+---+---------+------+
Ravishing answered 2/2, 2022 at 13:7 Comment(0)
S
1

Have you tried this

df.select(explode(split(col("Subjects"))).alias("Subjects")).show()

you can convert the data frame to an RDD.

For an RDD you can use a flatMap function to separate the Subjects.

Sanitize answered 28/6, 2018 at 12:25 Comment(1)
I've tried using a flat map as df.rdd.flatMap(lambda x: zip(*[x[c] for c in dcols])).toDF(dcols) but it is only giving me the first row and ignoring the remaining rows- |16 |A |Bob |Maths |Jobi
H
1

I've used the very elegant solution from @Nasty but if you have a lot of columns to explode, the scheduler on server side might run into issues if you generate lots of new dataframes with "withColumn()". So I slightly adapted the code to run more efficient and is more convenient to use:

def explode_all(df: DataFrame, index=True, cols: list = []):
"""Explode multiple array type columns.
loop through explodable signals [array type columns] and explode multiple columns.
First, colums need to be zipped into the df:
[a, [1, 2, 3]], [b, [10, 20, 30]] -> [[[a, 1], [b, 10]], [[a, 2], [b, 20]], [[a, 3], [b, 30]]]
In a for loop the zipped df is passed into separate columns#

df -- Required: Input dataframe

index -- Optional: if an index is required (default: True)

cols -- Optional: list of column names.
"""

if not isinstance(df, DataFrame):           # Error handling
    raise TypeError("Input data must be a PySpark DataFrame")

if cols:
    expl_cols = [col for col, col_type in df.dtypes if (col_type[:5] == 'array' and col in cols)]
else:
    expl_cols = [col for col, col_type in df.dtypes if col_type[:5] == 'array']

if len(expl_cols) == 0:                     # Error handling
    raise ValueError("Input data does not contain any array columns")

if index:
    df = df.select("*", F.posexplode(F.arrays_zip(*expl_cols)).alias("signal_index", "exp_combo")).drop(*expl_cols)
else:
    df = df.select("*", F.explode(F.arrays_zip(*expl_cols)).alias("exp_combo")).drop(*expl_cols)

df = df.select("*", *[F.column('exp_combo.' + name).alias(name) for name in expl_cols]).drop(F.col('exp_combo'))

return df
Hirsute answered 8/12, 2023 at 9:38 Comment(0)
C
0

Copy/paste function if you need to repeat this quickly and easily across a large number of columns in a dataset

cols = ["word", "stem", "pos", "ner"]

def explode_cols(self, data, cols):
    data = data.withColumn('exp_combo', f.arrays_zip(*cols))
    data = data.withColumn('exp_combo', f.explode('exp_combo'))
    for col in cols:
        data = data.withColumn(col, f.col('exp_combo.' + col))

    return data.drop(f.col('exp_combo'))

result = explode_cols(data, cols)

Your welcome :)

Corrosive answered 7/4, 2022 at 23:4 Comment(2)
arrays_zip does not take list as inputBluegrass
@Quynh-MaiChu you need to use a starred expression as written in the function above f.arrays_zip(*cols).. so that python will process your list as arguments instead of a list object. you can also do ..._zip(f.col('col1'), f.col('col2')) if you find that clearerCorrosive
A
0

When Exploding multiple columns, the above solution comes in handy only when the length of array is same, but if they are not. It is better to explode them separately and take distinct values each time.

df = sql.createDataFrame(
    [(['Bob'], [16], ['Maths','Physics','Chemistry'], ['A','B','C'])],
    ['Name','Age','Subjects', 'Grades'])

df = df.withColumn('Subjects',F.explode('Subjects')).select('Name','Age','Subjects', 'Grades').distinct()

df = df.withColumn('Grades',F.explode('Grades')).select('Name','Age','Subjects', 'Grades').distinct()

df.show()

 +----+---+---------+------+
|Name|Age| Subjects|Grades|
+----+---+---------+------+
| Bob| 16|    Maths|     A|
| Bob| 16|  Physics|     B|
| Bob| 16|Chemistry|     C|
+----+---+---------+------+
Alcina answered 19/11, 2022 at 18:36 Comment(0)
W
0

Thanks @nasty for saving the day. Just small tweaks to get the code working.

def explode_cols( df, cl):
df = df.withColumn('exp_combo', arrays_zip(*cl))
df = df.withColumn('exp_combo', explode('exp_combo'))
for colm in cl:
    final_col = 'exp_combo.'+ colm 
    df = df.withColumn(final_col, col(final_col))
    
    #print col
    #print ('exp_combo.'+ colm)
return df.drop(col('exp_combo'))
Wieldy answered 1/1, 2023 at 10:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.