How to reference a dataframe when in an UDF on another dataframe?
Asked Answered
C

2

7

How do you reference a pyspark dataframe when in the execution of an UDF on another dataframe?

Here's a dummy example. I am creating two dataframes scores and lastnames, and within each lies a column that is the same across the two dataframes. In the UDF applied on scores, I want to filter on lastnames and return a string found in lastname.

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local")
sqlCtx = SQLContext(sc)


# Generate Random Data
import itertools
import random
student_ids = ['student1', 'student2', 'student3']
subjects = ['Math', 'Biology', 'Chemistry', 'Physics']
random.seed(1)
data = []

for (student_id, subject) in itertools.product(student_ids, subjects):
    data.append((student_id, subject, random.randint(0, 100)))

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student_id", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])

# Create DataFrame 
rdd = sc.parallelize(data)
scores = sqlCtx.createDataFrame(rdd, schema)

# create another dataframe
last_name = ["Granger", "Weasley", "Potter"]
data2 = []
for i in range(len(student_ids)):
    data2.append((student_ids[i], last_name[i]))

schema = StructType([
            StructField("student_id", StringType(), nullable=False),
            StructField("last_name", StringType(), nullable=False)
    ])

rdd = sc.parallelize(data2)
lastnames = sqlCtx.createDataFrame(rdd, schema)


scores.show()
lastnames.show()


from pyspark.sql.functions import udf
def getLastName(sid):
    tmp_df = lastnames.filter(lastnames.student_id == sid)
    return tmp_df.last_name

getLastName_udf = udf(getLastName, StringType())
scores.withColumn("last_name", getLastName_udf("student_id")).show(10)

And the following is the last part of the trace:

Py4JError: An error occurred while calling o114.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Cantone answered 30/12, 2016 at 4:21 Comment(3)
You can not access df inside UDF because it will be processed in executor and df ref is accessible from driver only. You can use broadcast variables for lastnames. Let me know if need any help.Yaya
But consider left joining the lastnames with scores rather than doing it from UDF.Yaya
Hi @mrsrinivas, thanks for the reply. Firstly I cannot joins because even though this dummy example can be solved using joins, in my actual implementation i need to do more processing within the UDF. Secondly, yes! How can I use broadcast variables in this case?Cantone
Y
7

Changing pair to dictionary for easy lookup of names

data2 = {}
for i in range(len(student_ids)):
    data2[student_ids[i]] = last_name[i]

Instead of creating rdd and making it to df create broadcast variable

//rdd = sc.parallelize(data2) 
//lastnames = sqlCtx.createDataFrame(rdd, schema)
lastnames = sc.broadcast(data2)  

Now access this in udf with values attr on broadcast variable(lastnames).

from pyspark.sql.functions import udf
def getLastName(sid):
    return lastnames.value[sid]
Yaya answered 30/12, 2016 at 4:45 Comment(4)
I modified your implementation with broadcast variables. Try to make your UDF as much pure function as possible, too much external dependency may boil down the the performance.Yaya
I tried your code snippet - when I look at lastnames.value, I get [('student1', 'Granger'), ('student2', 'Weasley'), ('student3', 'Potter')], which means lastnames.value.filter wouldn't work anymore right?Cantone
Yeah It seems. try return lastnames.value["sid"] in udf and create a dictionary(variable data2) with sid as key and value as lastname .Yaya
onvery pd df to spark dfMethodist
A
12

You can't directly reference a dataframe (or an RDD) from inside a UDF. The DataFrame object is a handle on your driver that spark uses to represent the data and actions that will happen out on the cluster. The code inside your UDF's will run out on the cluster at a time of Spark's choosing. Spark does this by serializing that code, and making copies of any variables included in the closure and sending them out to each worker.

What instead you want to do, is use the constructs Spark provides in it's API to join/combine the two DataFrames. If one of the data sets is small, you can manually send out the data in a broadcast variable, and then access it from your UDF. Otherwise, you can just create the two dataframes like you did, then use the join operation to combine them. Something like this should work:

joined = scores.withColumnRenamed("student_id", "join_id")
joined = joined.join(lastnames, joined.join_id == lastnames.student_id)\
               .drop("join_id")
joined.show()

+---------+-----+----------+---------+
|  subject|score|student_id|last_name|
+---------+-----+----------+---------+
|     Math|   13|  student1|  Granger|
|  Biology|   85|  student1|  Granger|
|Chemistry|   77|  student1|  Granger|
|  Physics|   25|  student1|  Granger|
|     Math|   50|  student2|  Weasley|
|  Biology|   45|  student2|  Weasley|
|Chemistry|   65|  student2|  Weasley|
|  Physics|   79|  student2|  Weasley|
|     Math|    9|  student3|   Potter|
|  Biology|    2|  student3|   Potter|
|Chemistry|   84|  student3|   Potter|
|  Physics|   43|  student3|   Potter|
+---------+-----+----------+---------+

It's also worth noting, that under the hood Spark DataFrames has an optimization where a DataFrame that is part of a join can be converted to a broadcast variable to avoid a shuffle if it is small enough. So if you do the join method listed above, you should get the best possible performance, without sacrificing the ability to handle larger data sets.

Annamaeannamaria answered 30/12, 2016 at 16:40 Comment(0)
Y
7

Changing pair to dictionary for easy lookup of names

data2 = {}
for i in range(len(student_ids)):
    data2[student_ids[i]] = last_name[i]

Instead of creating rdd and making it to df create broadcast variable

//rdd = sc.parallelize(data2) 
//lastnames = sqlCtx.createDataFrame(rdd, schema)
lastnames = sc.broadcast(data2)  

Now access this in udf with values attr on broadcast variable(lastnames).

from pyspark.sql.functions import udf
def getLastName(sid):
    return lastnames.value[sid]
Yaya answered 30/12, 2016 at 4:45 Comment(4)
I modified your implementation with broadcast variables. Try to make your UDF as much pure function as possible, too much external dependency may boil down the the performance.Yaya
I tried your code snippet - when I look at lastnames.value, I get [('student1', 'Granger'), ('student2', 'Weasley'), ('student3', 'Potter')], which means lastnames.value.filter wouldn't work anymore right?Cantone
Yeah It seems. try return lastnames.value["sid"] in udf and create a dictionary(variable data2) with sid as key and value as lastname .Yaya
onvery pd df to spark dfMethodist

© 2022 - 2024 — McMap. All rights reserved.