Spark UDF for StructType / Row
Asked Answered
V

3

25

I have a "StructType" column in spark Dataframe that has an array and a string as sub-fields. I'd like to modify the array and return the new column of the same type. Can I process it with UDF? Or what are the alternatives?

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil)
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil)
val data = Seq(Row(Row(Array(1,2),"eb")),  Row(Row(Array(3,2,1), "dsf")) )
val rd = sc.parallelize(data)
val df = spark.createDataFrame(rd, schema)
df.printSchema

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)

It seems that I need a UDF of the type Row, something like

val u =  udf((x:Row) => x)
       >> Schema for type org.apache.spark.sql.Row is not supported

This makes sense, since Spark does not know the schema for the return type. Unfortunately, udf.register fails too:

spark.udf.register("foo", (x:Row)=> Row, sub_schema)
     <console>:30: error: overloaded method value register with alternatives: ...
Vertigo answered 21/3, 2017 at 15:43 Comment(1)
Why a UDF of the type Row would work? How does the type of UDF of Spark is inferred?Rozek
V
20

turns out you can pass the result schema as a second UDF parameter:

val u =  udf((x:Row) => x, sub_schema)
Vertigo answered 28/3, 2017 at 18:21 Comment(1)
What if you want to use the schema of x? Supposing x is a GenericRowWithSchema.Soper
D
8

You are on the right track. In this scenario UDF will make your life easy. As you have already encountered, UDF can not return types which spark does not know about. So basically you will need return something which spark can easily serialize. It may be a case class or you can return a tuple like (Seq[Int], String) . So here is a modified version of your code:

def main(args: Array[String]): Unit = {
  import org.apache.spark.sql.Row
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types._
  val sub_schema = StructType(StructField("col1", ArrayType(IntegerType, false), true) :: StructField("col2", StringType, true) :: Nil)
  val schema = StructType(StructField("subtable", sub_schema, true) :: Nil)
  val data = Seq(Row(Row(Array(1, 2), "eb")), Row(Row(Array(3, 2, 1), "dsf")))
  val rd = spark.sparkContext.parallelize(data)
  val df = spark.createDataFrame(rd, schema)

  df.printSchema()
  df.show(false)

  val mapArray = (subRows: Row) => {
    // I prefer reading values from row by specifying column names, you may use index also
    val col1 = subRows.getAs[Seq[Int]]("col1")
    val mappedCol1 = col1.map(x => x * x) // Use map based on your requirements
    (mappedCol1, subRows.getAs[String]("col2")) // now mapping is done for col2
  }
  val mapUdf = udf(mapArray)

  val newDf = df.withColumn("col1_mapped", mapUdf(df("subtable")))
  newDf.show(false)
  newDf.printSchema()
}

Please take a look at these links, these may give you more insight.

  1. Most comprehensive answer on working with complex schema: https://mcmap.net/q/197313/-querying-spark-sql-dataframe-with-complex-types
  2. Spark supported data types: https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
Denticle answered 21/3, 2017 at 19:37 Comment(0)
M
6

Yes you can do this with UDF. For simplicity, I took your example with case classes and I changed the array by adding 2 to every value :

case class Root(subtable: Subtable)
case class Subtable(col1: Seq[Int], col2: String)

val df = spark.createDataFrame(Seq(
  Root(Subtable(Seq(1, 2, 3), "toto")),
  Root(Subtable(Seq(10, 20, 30), "tata"))
))

val myUdf = udf((subtable: Row) =>
  Subtable(subtable.getSeq[Int](0).map(_ + 2), subtable.getString(1))
)
val result = df.withColumn("subtable_new", myUdf(df("subtable")))
result.printSchema()
result.show(false)

will print :

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)
 |-- subtable_new: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)

+-------------------------------+-------------------------------+
|subtable                       |subtable_new                   |
+-------------------------------+-------------------------------+
|[WrappedArray(1, 2, 3),toto]   |[WrappedArray(3, 4, 5),toto]   |
|[WrappedArray(10, 20, 30),tata]|[WrappedArray(12, 22, 32),tata]|
+-------------------------------+-------------------------------+
Matta answered 21/3, 2017 at 19:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.