Derive multiple columns from a single column in a Spark DataFrame
Asked Answered
D

5

53

I have a DF with a huge parseable metadata as a single string column in a Dataframe, lets call it DFA, with ColmnA.

I would like to break this column, ColmnA into multiple columns thru a function, ClassXYZ = Func1(ColmnA). This function returns a class ClassXYZ, with multiple variables, and each of these variables now has to be mapped to new Column, such a ColmnA1, ColmnA2 etc.

How would I do such a transformation from 1 Dataframe to another with these additional columns by calling this Func1 just once, and not have to repeat-it to create all the columns.

Its easy to solve if I were to call this huge function every time to add a new column, but that what I wish to avoid.

Kindly please advise with a working or pseudo code.

Thanks

Sanjay

Drudge answered 25/8, 2015 at 5:33 Comment(0)
U
68

Generally speaking what you want is not directly possible. UDF can return only a single column at the time. There are two different ways you can overcome this limitation:

  1. Return a column of complex type. The most general solution is a StructType but you can consider ArrayType or MapType as well.

    import org.apache.spark.sql.functions.udf
    
    val df = Seq(
      (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
    ).toDF("x", "y", "z")
    
    case class Foobar(foo: Double, bar: Double)
    
    val foobarUdf = udf((x: Long, y: Double, z: String) => 
      Foobar(x * y, z.head.toInt * y))
    
    val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
    df1.show
    // +---+----+---+------------+
    // |  x|   y|  z|      foobar|
    // +---+----+---+------------+
    // |  1| 3.0|  a| [3.0,291.0]|
    // |  2|-1.0|  b|[-2.0,-98.0]|
    // |  3| 0.0|  c|   [0.0,0.0]|
    // +---+----+---+------------+
    
    df1.printSchema
    // root
    //  |-- x: long (nullable = false)
    //  |-- y: double (nullable = false)
    //  |-- z: string (nullable = true)
    //  |-- foobar: struct (nullable = true)
    //  |    |-- foo: double (nullable = false)
    //  |    |-- bar: double (nullable = false)
    

    This can be easily flattened later but usually there is no need for that.

  2. Switch to RDD, reshape and rebuild DF:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
      Seq(x * y, z.head.toInt * y)
    
    val schema = StructType(df.schema.fields ++
      Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
    
    val rows = df.rdd.map(r => Row.fromSeq(
      r.toSeq ++
      foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
    
    val df2 = sqlContext.createDataFrame(rows, schema)
    
    df2.show
    // +---+----+---+----+-----+
    // |  x|   y|  z| foo|  bar|
    // +---+----+---+----+-----+
    // |  1| 3.0|  a| 3.0|291.0|
    // |  2|-1.0|  b|-2.0|-98.0|
    // |  3| 0.0|  c| 0.0|  0.0|
    // +---+----+---+----+-----+
    
Ultrasonic answered 26/10, 2015 at 12:23 Comment(3)
When you say "usually there's no for [flattening a column]", why is that? Or does spark allow most things that you do with top-level columns to also be done with hierarchical data (like df1.foobar.foo)?Spool
@Spool Because simple structs can be used in pretty much any context when one would normally use flat structure (with simple dot syntax fooobar.foo). It doesn't apply to collection types though. You can also check https://mcmap.net/q/197313/-querying-spark-sql-dataframe-with-complex-typesUltrasonic
You could try a different approach when assigning to the dataframe column, using the "withColumn" in the example is upper this: val df1 = df.withColumn("foo", foobarUdf($"x", $"y", $"z").getField("foo")).withColumn("bar", foobarUdf($"x", $"y", $"z").getField("bar")) Now, the schema has 2 new columns: "foo" and "bar".Hawkinson
W
18

Assume that after your function there will be a sequence of elements, giving an example as below:

val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
df.show
+------------------+---+
|          infoComb|age|
+------------------+---+
|Mike,1986,Toronto| 30|
| Andre,1980,Ottawa| 36|
|  jill,1989,London| 27|
+------------------+---+

now what you can do with this infoComb is that you can start split the string and get more columns with:

df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
+-----+----------+-------+---+
| name|yearOfBorn|   city|age|
+-----+----------+-------+---+
|Mike|      1986|Toronto| 30|
|Andre|      1980| Ottawa| 36|
| jill|      1989| London| 27|
+-----+----------+-------+---+

Hope this helps.

Whittle answered 13/7, 2016 at 0:46 Comment(1)
Couldn't you just say df.select('infoComb.*', 'age') The .* on a column name selects each field in the struct as a new column.Jeff
Z
5

If your resulting columns will be of the same length as the original one, you can create brand new columns with withColumn function and by applying an udf. After this you can drop your original column, eg:

 val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
.withColumn("newCol2", myFun2(myDf("originalColumn"))
.drop(myDf("originalColumn"))

where myFun is an udf defined like this:

   def myFun= udf(
    (originalColumnContent : String) =>  {
      // do something with your original column content and return a new one
    }
  )
Zipah answered 25/8, 2015 at 7:59 Comment(2)
Hi Niemand, I appreciate your reply...but it does not solve the question... in you code, you are calling the function "myDF" several times, whereas I would like that function to be called once, generate a class having multiple fields , and each field variable be returned as a new columnsDrudge
Well I', afraid that I presented the only one possible way for know, I don't think that any other way exists, but hopefully I am wrong ;). Also not that I did not call myFun several times - you can call other functions like myFun2, myFun3 etc. to create columns that you need.Zipah
E
2

I opted to create a function to flatten one column and then just call it simultaneously with the udf.

First define this:

implicit class DfOperations(df: DataFrame) {

  def flattenColumn(col: String) = {
    def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
      if (cols.isEmpty) df
      else addColumns(
        df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
        cols.tail
      )
    }

    val field = df.select(col).schema.fields(0)
    val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)

    addColumns(df, newCols).drop(col)
  }

  def withColumnMany(colName: String, col: Column) = {
    df.withColumn(colName, col).flattenColumn(colName)
  }

}

Then usage is very simple:

case class MyClass(a: Int, b: Int)

val df = sc.parallelize(Seq(
  (0),
  (1)
)).toDF("x")

val f = udf((x: Int) => MyClass(x*2,x*3))

df.withColumnMany("test", f($"x")).show()

//  +---+------+------+
//  |  x|test_a|test_b|
//  +---+------+------+
//  |  0|     0|     0|
//  |  1|     2|     3|
//  +---+------+------+
Entertainer answered 7/6, 2016 at 13:49 Comment(1)
You don't have to do the whole withColumnMany thing. Just use select("select.*") to flatten it.Mcclinton
M
-3

This can be easily achieved by using pivot function

df4.groupBy("year").pivot("course").sum("earnings").collect() 
Malagasy answered 19/1, 2017 at 6:2 Comment(1)
I don't see "year", "course" or "earnings" in any of the answers or o.p... what data frame are you talking about in this very terse answer (not)?Salon

© 2022 - 2024 — McMap. All rights reserved.