Spark sql how to explode without losing null values
Asked Answered
F

6

69

I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]

should become

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

This is my code

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}

The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null

becomes

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

instead of

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
2  | Lucy | null

How can I explode my arrays so that I don't lose the null rows?

I am using Spark 1.5.2 and Java 8

Fingertip answered 28/9, 2016 at 5:57 Comment(0)
N
142

Spark 2.2+

You can use explode_outer function:

import org.apache.spark.sql.functions.explode_outer

df.withColumn("likes", explode_outer($"likes")).show

// +---+----+--------+
// | id|name|   likes|
// +---+----+--------+
// |  1|Luke|baseball|
// |  1|Luke|  soccer|
// |  2|Lucy|    null|
// +---+----+--------+

Spark <= 2.1

In Scala but Java equivalent should be almost identical (to import individual functions use import static).

import org.apache.spark.sql.functions.{array, col, explode, lit, when}

val df = Seq(
  (1, "Luke", Some(Array("baseball", "soccer"))),
  (2, "Lucy", None)
).toDF("id", "name", "likes")

df.withColumn("likes", explode(
  when(col("likes").isNotNull, col("likes"))
    // If null explode an array<string> with a single null
    .otherwise(array(lit(null).cast("string")))))

The idea here is basically to replace NULL with an array(NULL) of a desired type. For complex type (a.k.a structs) you have to provide full schema:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")

val st =  StructType(Seq(
  StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast(st)))))

or

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))

Note:

If array Column has been created with containsNull set to false you should change this first (tested with Spark 2.1):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
Nubbin answered 28/9, 2016 at 6:8 Comment(9)
That looks great, thank you! I have a followup question: what if my column type is a StructType? I tried using cast(new StructType()), but I got data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type; I'm trying to make my method as generic as possible, so it fits all column types.Fingertip
Also, to get the column type, I'm using DataFrame.dtypes(). Is there a better way of getting the column types?Fingertip
a) You have to provide full schema with all fields. b) dtypes or schema.Nubbin
coalesce instead of case-when is more concise and should work like a charmLucius
doing an explode this way will make the column as not nullable. But since we are inserting nulls into the same column this will result in a NPE, how to counter this?Can we convert that column as a nullable column some how??Bookseller
@Nubbin can you provide for Spark <= 2.1 the pyspark version as well?Livable
@Livable It will be virtually identical correcting for small syntax difference (like isNotNull() instead of isNotNull).Nubbin
I can't find explode_outer in PySpark 2.2.1?Capitalism
Thank you. I found this out after trying to find what was eating up most of my data even when I was doing no filtering. But isn't this a bug in explode? The description of both explode and explode_outer are identical in the official documentation: spark.apache.org/docs/2.3.0/api/sql/index.html#explode_outerGlycosuria
D
7

You can use explode_outer() function.

Digitalis answered 25/2, 2019 at 15:31 Comment(0)
M
1

Following up on the accepted answer, when the array elements are a complex type it can be difficult to define it by hand (e.g with large structs).

To do it automatically I wrote the following helper method:

  def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
      val arrayFields = df.schema.fields
          .map(field => field.name -> field.dataType)
          .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
          .toMap

      columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
      dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
        .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))    
 }

Edit: it seems that spark 2.2 and newer have this built in.

Mell answered 9/2, 2018 at 9:37 Comment(2)
How is this def explodeOuter to be used in code?Fewness
You have to pass the dataframe that you want to explode, and the colum. explodeOuter(df, List("array_column"))Mell
Q
0

To handle empty map type column: for Spark <= 2.1

 List((1, Array(2, 3, 4), Map(1 -> "a")),
(2, Array(5, 6, 7), Map(2 -> "b")),
(3, Array[Int](), Map[Int, String]())).toDF("col1", "col2", "col3").show()


 df.select('col1, explode(when(size(map_keys('col3)) === 0, map(lit("null"), lit("null"))).
otherwise('col3))).show()
Quid answered 29/1, 2020 at 20:51 Comment(0)
D
0
from pyspark.sql.functions import *

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    flat_df = nested_df.select(flat_cols +
                               [col(nc + '.' + c).alias(nc + '_' + c)
                                for nc in nested_cols
                                for c in nested_df.select(nc + '.*').columns])
    print("flatten_df_count :", flat_df.count())
    return flat_df

def explode_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct' and c[1][:5] != 'array']
    array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
    for array_col in array_cols:
        schema = new_df.select(array_col).dtypes[0][1]
        nested_df = nested_df.withColumn(array_col, when(col(array_col).isNotNull(), col(array_col)).otherwise(array(lit(None)).cast(schema))) 
    nested_df = nested_df.withColumn("tmp", arrays_zip(*array_cols)).withColumn("tmp", explode("tmp")).select([col("tmp."+c).alias(c) for c in array_cols] + flat_cols)
    print("explode_dfs_count :", nested_df.count())
    return nested_df


new_df = flatten_df(myDf)
while True:
    array_cols = [c[0] for c in new_df.dtypes if c[1][:5] == 'array']
    if len(array_cols):
        new_df = flatten_df(explode_df(new_df))
    else:
        break
    
new_df.printSchema()

Used arrays_zip and explode to do it faster and address the null issue.

Donetsk answered 23/6, 2021 at 5:36 Comment(0)
S
0

Explode:

  • The explode function is used to create a new row for each element within an array or map column.
  • When applied to an array, it generates a new default column (usually named “col1”) containing all the array elements.
  • If you have an array of structs, explode will create separate rows for each struct element.
  • It ignores null or empty values in the array or map.
  • Example:
    • Given a DataFrame with an array column “likes”:

       id | name | likes
       -------------------
       1  | Luke | [baseball, soccer]
       2  | Lucy | null
       3  | Doug | []
      
    • Applying explode to the “likes” column results in:

      from pyspark.sql import functions as f
      from pyspark.sql.functions import explode
      
      df.withColumn('likes', explode(f.col('likes'))).show()
      
      id | name | likes
      -------------------
      1  | Luke | baseball
      1  | Luke | soccer
      

Explode Outer:

  • The explode_outer function returns all values in the array or map, including null or empty values.
  • Unlike explode, it does not filter out null or empty source columns.
  • Example:
    • Using the same DataFrame as above:

      id | name | likes
      -------------------
      1  | Luke | [baseball, soccer]
      2  | Lucy | null
      3  | Doug | []
      
    • Applying explode_outer to the “likes” column results in:

       from pyspark.sql import functions as f
       from pyspark.sql.functions import explode_outer
      
       df.withColumn('likes', explode_outer(f.col('likes'))).show()
      
      id | name | likes
      -------------------
      1  | Luke | baseball
      1  | Luke | soccer
      2  | Lucy | null
      3  | Doug | null
      

In summary:

  • Use explode when you want to break down an array into individual records, excluding null or empty values.
  • Use explode_outer when you need all values from the array or map, including null or empty ones.
Stichous answered 14/3 at 17:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.