Passing a list of tuples as a parameter to a spark udf in scala
Asked Answered
E

1

6

I am trying to pass a list of tuples to a udf in scala. I am not sure how to exactly define the datatype for this. I tried to pass it as a whole row but it can't really resolve it. I need to sort the list based on the first element of the tuple and then send n number of elements back. I have tried the following definitions for the udf

def udfFilterPath = udf((id: Long, idList: Array[structType[Long, String]] )

def udfFilterPath = udf((id: Long, idList: Array[Tuple2[Long, String]] )

def udfFilterPath = udf((id: Long, idList: Row)

This is what the idList looks like:

[[1234,"Tony"], [2345, "Angela"]]
[[1234,"Tony"], [234545, "Ruby"], [353445, "Ria"]]

This is a dataframe with a 100 rows like the above. I call the udf as follows:

testSet.select("id", "idList").withColumn("result", udfFilterPath($"id", $"idList")).show

When I print the schema for the dataframe it reads it as a array of structs. The idList itself is generated by doing a collect list over a column of tuples grouped by a key and stored in the dataframe. Any ideas on what I am doing wrong? Thanks!

Euphony answered 9/1, 2017 at 15:32 Comment(0)
I
8

When defining a UDF, you should use plain Scala types (e.g. Tuples, Primitives...) and not the Spark SQL types (e.g. StructType) as the output types.

As for the input types - this is where it gets tricky (and not too well documented) - an array of tuples would actually be a mutable.WrappedArray[Row]. So - you'll have to "convert" each row into a tuple first, then you can do the sorting and return the result.

Lastly, by your description it seems that id column isn't used at all, so I removed it from the UDF definition, but it can easily be added back.

val udfFilterPath = udf { idList: mutable.WrappedArray[Row] =>
  // converts the array items into tuples, sorts by first item and returns first two tuples:
  idList.map(r => (r.getAs[Long](0), r.getAs[String](1))).sortBy(_._1).take(2)
}

df.withColumn("result", udfFilterPath($"idList")).show(false)

+------+-------------------------------------------+----------------------------+
|id    |idList                                     |result                      |
+------+-------------------------------------------+----------------------------+
|1234  |[[1234,Tony], [2345,Angela]]               |[[1234,Tony], [2345,Angela]]|
|234545|[[1234,Tony], [2345454,Ruby], [353445,Ria]]|[[1234,Tony], [353445,Ria]] |
+------+-------------------------------------------+----------------------------+
Insincere answered 9/1, 2017 at 15:42 Comment(3)
updated the answer - indeed output types and input types differ in this caseInsincere
Thank you so much!! I did have to sort it in the end so your solution answered two of my questions. I had converted it into a row but wasn't sure how to sort it after as I wasn't able to read it as a tuple.Euphony
When input is of type Array you can use Seq type in your udf instead of mutable.WrappedArrayGreatuncle

© 2022 - 2024 — McMap. All rights reserved.