Remove null from array columns in Dataframe in Scala with Spark (1.6)
Asked Answered
G

3

3

I have a dataframe with "id" column and a column which has an array of struct. The schema:

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: long (nullable = false)

The array "desc" can have any number of null values. I would like to create the final dataframe with the array having none of the null values using Spark 1.6:

An example would be:

Key  .   Value
1010 .   [[George,21],null,[MARIE,13],null]
1023 .   [null,[Watson,11],[John,35],null,[Kyle,33]]

I want the final dataframe as:

id   .   desc
1010 .   [[George,21],[MARIE,13]]
1023 .   [[Watson,11],[John,35],[Kyle,33]]

I tried doing this with UDF and case class but got

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to....

Any help is greatly appreciated and I would prefer doing it without converting to RDDs if needed.

Grille answered 7/5, 2018 at 13:17 Comment(0)
M
1

Given that the original dataframe has following schema

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: long (nullable = false)

Defining a udf function to remove the null values from the array should work for you

import org.apache.spark.sql.functions._
def removeNull = udf((array: Seq[Row])=> array.filterNot(_ == null).map(x => element(x.getAs[String]("name"), x.getAs[Long]("age"))))

df.withColumn("desc", removeNull(col("desc")))

where element is a case class

case class element(name: String, age: Long)

and you should get

+----+-----------------------------------+
|id  |desc                               |
+----+-----------------------------------+
|1010|[[George,21], [MARIE,13]]          |
|1010|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------+
Maquis answered 7/5, 2018 at 14:47 Comment(2)
I got the part till array.filternot(_ == null ) but why use a map then? I mean could you clarify what the map is doing?Grille
map is just creating structs to be returnedMaquis
L
2

Here is another version:

case class Person(name: String, age: Int)

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: integer (nullable = false)

+----+-----------------------------------------------+
|id  |desc                                           |
+----+-----------------------------------------------+
|1010|[[George,21], null, [MARIE,13], null]          |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|
+----+-----------------------------------------------+


val filterOutNull = udf((xs: Seq[Row]) => {
  xs.flatMap {
    case null => Nil
    // convert the Row back to your specific struct:
    case Row(s: String,i: Int) => List(Person(s, i))
  }
})

val result = df.withColumn("filteredListDesc", filterOutNull($"desc"))

+----+-----------------------------------------------+-----------------------------------+
|id  |desc                                           |filteredListDesc                   |
+----+-----------------------------------------------+-----------------------------------+
|1010|[[George,21], null, [MARIE,13], null]          |[[George,21], [MARIE,13]]          |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------------------+-----------------------------------+
Loren answered 7/5, 2018 at 14:51 Comment(2)
Hi there. I have a question.....Why does Seq[Person] not used in the UDF instead Seq[Row] is used. Why is using Seq[Person] giving a casting error when a case class is basically a structure same as defined in the schema?Grille
In short, it's easy for spark to use its internal types like row/struct/arrays instead of JVM objects like case classes.Loren
M
1

Given that the original dataframe has following schema

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: long (nullable = false)

Defining a udf function to remove the null values from the array should work for you

import org.apache.spark.sql.functions._
def removeNull = udf((array: Seq[Row])=> array.filterNot(_ == null).map(x => element(x.getAs[String]("name"), x.getAs[Long]("age"))))

df.withColumn("desc", removeNull(col("desc")))

where element is a case class

case class element(name: String, age: Long)

and you should get

+----+-----------------------------------+
|id  |desc                               |
+----+-----------------------------------+
|1010|[[George,21], [MARIE,13]]          |
|1010|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------+
Maquis answered 7/5, 2018 at 14:47 Comment(2)
I got the part till array.filternot(_ == null ) but why use a map then? I mean could you clarify what the map is doing?Grille
map is just creating structs to be returnedMaquis
P
1

Spark 3.4+

array_compact($"desc")

Example input:

case class Person(name: String, age: Long)
val df1 = Seq(
    ("1010", Seq(Person("George", 21), null, Person("MARIE", 13), null)),
    ("1023", Seq(null, Person("Watson", 11), Person("John", 35), null, Person("Kyle", 33)))
).toDF("id", "desc")

df1.show(truncate=false)
// +----+--------------------------------------------------+
// |id  |desc                                              |
// +----+--------------------------------------------------+
// |1010|[{George, 21}, null, {MARIE, 13}, null]           |
// |1023|[null, {Watson, 11}, {John, 35}, null, {Kyle, 33}]|
// +----+--------------------------------------------------+

df1.printSchema()
// root
//  |-- id: string (nullable = true)
//  |-- desc: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- name: string (nullable = true)
//  |    |    |-- age: long (nullable = false)

Using array_compact:

val df2 = df1.withColumn("desc", array_compact($"desc"))

df2.show(truncate=false)
// +----+--------------------------------------+
// |id  |desc                                  |
// +----+--------------------------------------+
// |1010|[{George, 21}, {MARIE, 13}]           |
// |1023|[{Watson, 11}, {John, 35}, {Kyle, 33}]|
// +----+--------------------------------------+
Picco answered 13/5, 2023 at 17:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.