Aggregating multiple columns with custom function in Spark
Asked Answered
H

5

45

I was wondering if there is some way to specify a custom aggregation function for spark dataframes over multiple columns.

I have a table like this of the type (name, item, price):

john | tomato | 1.99
john | carrot | 0.45
bill | apple  | 0.99
john | banana | 1.29
bill | taco   | 2.59

to:

I would like to aggregate the item and it's cost for each person into a list like this:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)

Is this possible in dataframes? I recently learned about collect_list but it appears to only work for one column.

Horlacher answered 9/6, 2016 at 23:38 Comment(0)
C
38

The easiest way to do this as a DataFrame is to first collect two lists, and then use a UDF to zip the two lists together. Something like:

import org.apache.spark.sql.functions.{collect_list, udf}
import sqlContext.implicits._

val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

val df2 = df.groupBy("name").agg(
  collect_list(col("food")) as "food",
  collect_list(col("price")) as "price" 
).withColumn("food", zipper(col("food"), col("price"))).drop("price")

df2.show(false)
# +----+---------------------------------------------+
# |name|food                                         |
# +----+---------------------------------------------+
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
# |bill|[[apple,0.99], [taco,2.59]]                  |
# +----+---------------------------------------------+
Changeover answered 10/6, 2016 at 11:37 Comment(8)
I used col(...) instead of $"..." for a reason -- I find col(...) works with less work inside of things like class definitions.Changeover
Is there any function to realign columns like for example in the zip function tell it to first add an element from the tail of the column and remove one from the head and then zip them? In this case you can have for example next price for the items if you read prices daily and there is a time column.Ane
Not entirely sure what you are asking. But you can use DataFrame.select (...) to change the order of columns.Changeover
I meant like this question: stackoverflow.com/q/39274585/2525128. I used this answer a lot on my code but I am trying to use the same method for time series data and add the next occurrence of an event for an specific observation as a field but I since I do not know exactly when that happens it is a little hard to make happen.Ane
The answer assumes (maybe correctly) that collect_list() will preserve the order of elements on the two columns food & price. Meaning that food and price from the same row will end up at the same index in the two collected lists. Is this order preserving behavior guaranteed? (it would make sense, but I'm not sure by looking at the scala code for collect_list, not a scala programmer).Andrel
Afaik, there is no guarantee that the order of elements will be the same. cf : #40408014Autoxidation
I used a variation of this solution to zip five lists together. This gave me the opportunity to write the best line of code of my career so far: _ zip _ zip _ zip _ zip _Omidyar
Note: The function is non-deterministic because the order of collected results depends on order of rows which may be non-deterministic after a shuffle. spark.apache.org/docs/latest/api/python/…Proliferation
C
112

Consider using the struct function to group the columns together before collecting as a list:

import org.apache.spark.sql.functions.{collect_list, struct}
import sqlContext.implicits._

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

df.groupBy($"name")
  .agg(collect_list(struct($"food", $"price")).as("foods"))
  .show(false)

Outputs:

+----+---------------------------------------------+
|name|foods                                        |
+----+---------------------------------------------+
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
|bill|[[apple,0.99], [taco,2.59]]                  |
+----+---------------------------------------------+
Corporeity answered 10/3, 2017 at 19:50 Comment(2)
I want to mention that this approach looks cleaner than the accepted answer, but unfortunately doesn't work with spark 1.6, because collect_list() doesn't accept a struct.Geodesic
Works in Spark 2.1Mendenhall
C
38

The easiest way to do this as a DataFrame is to first collect two lists, and then use a UDF to zip the two lists together. Something like:

import org.apache.spark.sql.functions.{collect_list, udf}
import sqlContext.implicits._

val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

val df2 = df.groupBy("name").agg(
  collect_list(col("food")) as "food",
  collect_list(col("price")) as "price" 
).withColumn("food", zipper(col("food"), col("price"))).drop("price")

df2.show(false)
# +----+---------------------------------------------+
# |name|food                                         |
# +----+---------------------------------------------+
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
# |bill|[[apple,0.99], [taco,2.59]]                  |
# +----+---------------------------------------------+
Changeover answered 10/6, 2016 at 11:37 Comment(8)
I used col(...) instead of $"..." for a reason -- I find col(...) works with less work inside of things like class definitions.Changeover
Is there any function to realign columns like for example in the zip function tell it to first add an element from the tail of the column and remove one from the head and then zip them? In this case you can have for example next price for the items if you read prices daily and there is a time column.Ane
Not entirely sure what you are asking. But you can use DataFrame.select (...) to change the order of columns.Changeover
I meant like this question: stackoverflow.com/q/39274585/2525128. I used this answer a lot on my code but I am trying to use the same method for time series data and add the next occurrence of an event for an specific observation as a field but I since I do not know exactly when that happens it is a little hard to make happen.Ane
The answer assumes (maybe correctly) that collect_list() will preserve the order of elements on the two columns food & price. Meaning that food and price from the same row will end up at the same index in the two collected lists. Is this order preserving behavior guaranteed? (it would make sense, but I'm not sure by looking at the scala code for collect_list, not a scala programmer).Andrel
Afaik, there is no guarantee that the order of elements will be the same. cf : #40408014Autoxidation
I used a variation of this solution to zip five lists together. This gave me the opportunity to write the best line of code of my career so far: _ zip _ zip _ zip _ zip _Omidyar
Note: The function is non-deterministic because the order of collected results depends on order of rows which may be non-deterministic after a shuffle. spark.apache.org/docs/latest/api/python/…Proliferation
J
10

Maybe a better way than the zip function (since UDF and UDAF are very bad to performance) is to wrap the two columns into Struct.

This would probably work as well:

df.select('name, struct('food, 'price).as("tuple"))
  .groupBy('name)
  .agg(collect_list('tuple).as("tuples"))
Jackquelinejackrabbit answered 6/8, 2018 at 17:38 Comment(0)
P
6

To your point collect_list appears to only work for one column : For collect_list to work on multiple columns you will have to wrap the columns you want as aggregate in a struct. For e.g :

     val aggregatedData = df.groupBy("name").agg(collect_list(struct("item", "price")) as("food"))

     aggregatedData.show
+----+------------------------------------------------+
|name|foods                                           |
+----+------------------------------------------------+
|john|[[tomato, 1.99], [carrot, 0.45], [banana, 1.29]]|
|bill|[[apple, 0.99], [taco, 2.59]]                   |
+----+------------------------------------------------+
Polyethylene answered 2/3, 2020 at 4:14 Comment(0)
A
2

Here is an option by converting the data frame to a RDD of Map and then call a groupByKey on it. The result would be a list of key-value pairs where value is a list of tuples.

df.show
+----+------+----+
|  _1|    _2|  _3|
+----+------+----+
|john|tomato|1.99|
|john|carrot|0.45|
|bill| apple|0.99|
|john|banana|1.29|
|bill|  taco|2.59|
+----+------+----+


val tuples = df.map(row => row(0) -> (row(1), row(2)))
tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43

tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect
res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29))))
Ara answered 10/6, 2016 at 2:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.