Spark SQL nested withColumn
Asked Answered
M

2

19

I have a DataFrame that has multiple columns of which some of them are structs. Something like this

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)
 |-- abc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- def: struct (nullable = true)
 |    |    |    |-- a: string (nullable = true)
 |    |    |    |-- b: integer (nullable = true)
 |    |    |    |-- c: string (nullable = true)

I want to apply a UserDefinedFunction on the column baz to replace baz with a function of baz, but I cannot figure out how to do that. Here is an example of the desired output (note that baz is now an int)

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: int (nullable = true)
 |-- abc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- def: struct (nullable = true)
 |    |    |    |-- a: string (nullable = true)
 |    |    |    |-- b: integer (nullable = true)
 |    |    |    |-- c: string (nullable = true)

It looks like DataFrame.withColumn only works on top level columns but not on nested columns. I'm using Scala for this problem.

Can someone help me out with this?

Thanks

Man answered 29/6, 2017 at 17:44 Comment(0)
F
30

that's easy, just use a dot to select nested structures, e.g. $"foo.baz" :

case class Foo(bar:String,baz:String)
case class Record(foo:Foo)

val df = Seq(
   Record(Foo("Hi","There"))
).toDF()


df.printSchema

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)


val myUDF = udf((s:String) => {
 // do something with s 
  s.toUpperCase
})


df
.withColumn("udfResult",myUDF($"foo.baz"))
.show

+----------+---------+
|       foo|udfResult|
+----------+---------+
|[Hi,There]|    THERE|
+----------+---------+

If you want to add the result of your UDF to the existing struct foo, i.e. to get:

root
 |-- foo: struct (nullable = false)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)
 |    |-- udfResult: string (nullable = true)

there are two options:

with withColumn:

df
.withColumn("udfResult",myUDF($"foo.baz"))
.withColumn("foo",struct($"foo.*",$"udfResult"))
.drop($"udfResult")

with select:

df
.select(struct($"foo.*",myUDF($"foo.baz").as("udfResult")).as("foo"))

EDIT: Replacing the existing attribute in the struct with the result from the UDF: unfortunately, this does not work:

df
.withColumn("foo.baz",myUDF($"foo.baz")) 

but can be done like this:

// get all columns except foo.baz
val structCols = df.select($"foo.*")
    .columns
    .filter(_!="baz")
    .map(name => col("foo."+name))

df.withColumn(
    "foo",
    struct((structCols:+myUDF($"foo.baz").as("baz")):_*)
)
Faruq answered 29/6, 2017 at 19:7 Comment(5)
@RaphaelRoth what are col and struct I do not find these classes in spark ?Stowe
@Gaurav Shah import org.apache.spark.sql.functions._Faruq
Any idea how to get the replacement to work with the array element, so the same as above only for abc.a rather than foo.bazReifel
Is there any solution to Breandan's question above? Thank you.Pinto
What to do if there is one more level of hierarch between foo and baz.Methuselah
G
3

You can do this using the struct function as Raphael Roth has already been demonstrated in their answer above. There is an easier way to do this though using the Make Structs Easy* library. The library adds a withField method to the Column class allowing you to add/replace Columns inside a StructType column, in much the same way as the withColumn method on the DataFrame class allows you to add/replace columns inside a DataFrame. For your specific use-case, you could do something like this:

import org.apache.spark.sql.functions._
import com.github.fqaiser94.mse.methods._

// generate some fake data
case class Foo(bar: String, baz: String)
case class Record(foo: Foo, arrayOfFoo: Seq[Foo])

val df = Seq(
   Record(Foo("Hello", "World"), Seq(Foo("Blue", "Red"), Foo("Green", "Yellow")))
).toDF

df.printSchema

// root
//  |-- foo: struct (nullable = true)
//  |    |-- bar: string (nullable = true)
//  |    |-- baz: string (nullable = true)
//  |-- arrayOfFoo: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- bar: string (nullable = true)
//  |    |    |-- baz: string (nullable = true)

df.show(false)

// +--------------+------------------------------+
// |foo           |arrayOfFoo                    |
// +--------------+------------------------------+
// |[Hello, World]|[[Blue, Red], [Green, Yellow]]|
// +--------------+------------------------------+

// example user defined function that capitalizes a given string
val myUdf = udf((s: String) => s.toUpperCase)

// capitalize value of foo.baz
df.withColumn("foo", $"foo".withField("baz", myUdf($"foo.baz"))).show(false)

// +--------------+------------------------------+
// |foo           |arrayOfFoo                    |
// +--------------+------------------------------+
// |[Hello, WORLD]|[[Blue, Red], [Green, Yellow]]|
// +--------------+------------------------------+

I noticed you had a follow-up question about replacing a Column nested inside a struct nested inside of an array. This can also be done by combining the functions provided by the Make Structs Easy library with the functions provided by spark-hofs library, as follows:

import za.co.absa.spark.hofs._

// capitalize the value of foo.baz in each element of arrayOfFoo
df.withColumn("arrayOfFoo", transform($"arrayOfFoo", foo => foo.withField("baz", myUdf(foo.getField("baz"))))).show(false)

// +--------------+------------------------------+
// |foo           |arrayOfFoo                    |
// +--------------+------------------------------+
// |[Hello, World]|[[Blue, RED], [Green, YELLOW]]|
// +--------------+------------------------------+

*Full disclosure: I am the author of the Make Structs Easy library that is referenced in this answer.

Griseofulvin answered 14/3, 2020 at 21:13 Comment(1)
Since Spark 3.1 a withField method has been added to API.Embrocation

© 2022 - 2024 — McMap. All rights reserved.