How to use a PySpark UDF in a Scala Spark project?
Asked Answered
P

1

12

Several people (1, 2, 3) have discussed using a Scala UDF in a PySpark application, usually for performance reasons. I am interested in the opposite - using a python UDF in a Scala Spark project.

I am particularly interested in building a model using sklearn (and MLFlow) then efficiently applying that to records in a Spark streaming job. I know I could also host the python model behind a REST API and make calls to that API in the Spark streaming application in mapPartitions, but managing concurrency for that task and setting up the API for hosted model isn't something I'm super excited about.

Is this possible without too much custom development with something like Py4J? Is this just a bad idea?

Thanks!

Periostitis answered 18/8, 2018 at 16:30 Comment(11)
It is possible, though definitely not supported nor straightforward. So the question really is why would you even try. It is really hard to find a reasonable justification for such process.Eddieeddina
@user6910411 Thanks for the response. I explained the use case in the question - I'd like to use a model I trained using sklearn to evaluate individual rows in a structured streaming application.Periostitis
I guess the question is - if you already want to pay a price of inter-language communication, why not go with PySpark all the way?Eddieeddina
In this case, because 1) the python operation will be a small piece of a larger Spark job, and I'd rather not pay the PySpark penalty for the whole thing and 2) I already have a mature Scala project, and just want to add in a bit of python w/o needing a rewrite.Periostitis
Not submitting as an answer but if you use Databricks you can use Scala and Python in the same job. You can hop over to Pyspark for that UDF with sklearn, score the records, and then immediately transition back to Scala downstream. The common layer is Spark tables in SparkSQL. Pyspark can read them and write them, as well as Spark in Scala (obviously). Not sure how you would do this with pure open source, or if something like Zeppelin supports this. (Full disclosure, I work for Databricks)Cove
Thanks @RaphaelK: we are Databricks customers at the company I work for, so I'm aware of that option, at least in notebook powered jobs.Periostitis
Were you able to find a solution for this? I have a similar problem. I have a well-matured spark/scala project where I need to load models generated using sklearn or even pytorch and then do predictions on a large dataset.Cowhide
@ShirishKumar kind of. I did have to do something recently where I had a udf that was doing some complex network operations, and I wanted to reuse the open connection and add some response caching. What I did was create a global object and set up the udf to make function calls through the global object. The global managed the connection, cached responses, etc. If I had to solve this today I'd take the same approach and open up a process running python then feed records through process. This is similar to how python udfs work in pyspark (all data goes through stdin/stdout).Periostitis
Some examples of this weird pattern: github.com/apache/spark/blob/master/core/src/main/scala/org/…, github.com/apache/spark/blob/v3.0.1/sql/core/src/main/scala/org/…, github.com/apache/spark/blob/v3.0.1/core/src/main/scala/org/…, github.com/apache/spark/blob/v3.0.1/core/src/main/scala/org/… The last one is the main entrypoint for running python udfs in spark.Periostitis
To summarize the above: look at how python udfs work by checking out PythonRunner. The code is private[spark], but you can still access it by putting a wrapper object inside org.apache.spark. There is a lot of fanciness inside that function that you may not need. If you don't need udfs, a simple mapPartitions (spinning up a python process per partition) may suffice to call your code.Periostitis
Please, have a look at https://mcmap.net/q/1012740/-how-to-use-pyspark-udf-in-java-scala-spark-project/6380624Novosibirsk
F
1

Maybe I'm late to the party, but at least I can help with this for posterity. This is actually achievable by creating your python udf and registering it with spark.udf.register("my_python_udf", foo). You can view the doc here https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.UDFRegistration.register

This function can then be called from sqlContext in Python, Scala, Java, R or any language really, because you're accessing sqlContext directly (where the udf is registered). For example, you would call something like

spark.sql("SELECT my_python_udf(...)").show()

PROS - You get to call your sklearn model from Scala.

CONS - You have to use sqlContext and write SQL style queries.

I hope this helps, at least for any future visitors.

Fathometer answered 25/11, 2019 at 14:42 Comment(3)
Thanks for this. It looks like we should be able to submit python zips alongside a primary jar for a spark job and use those python zips as dependencies.Periostitis
I think you're speaking from a situation where you have a context in a Python process, register the UDF, and then reuse the context in a JVM where you could access it. This would be possible in a Databricks notebook, but not when I have a single job that I start with spark-submit.Vanesavanessa
I'm looking for the way to create a PySpark UDF for a SparkContext created using Scala. The link you gave is now broken.Housing

© 2022 - 2024 — McMap. All rights reserved.