Invoking Pyspark script from Scala Spark Code
Asked Answered
E

2

3

I have a Scala Spark application and want to invoke pySpark/python (pyspark_script.py) for further processing.

There are multiple resources to use Java/Scala code in Python but I am looking for scala->Pyspark

I explored Jython for Scala/Java to include Python code as follows:

PythonInterpreter.initialize(System.getProperties, properties, sysArgs)
val pi = new PythonInterpreter()
pi.execfile("path/to/pyscript/mypysparkscript.py")

I see error that says: "ImportError: No module named pyspark"

Is there any way on how Scala spark can talk to PYSpark with same sparkContext/session?

Empale answered 12/8, 2021 at 19:27 Comment(0)
N
0

You can run shell commands in scala using process object.

// Spark codes goes here .....
// Call pyspark code 
import sys.process._
"python3 /path/to/python/file.py.!!

To use same session add below line to python file.

spark = SparkSession.builder.getOrCreate()

You can use getActiveSession() method also.

NOTE: Make sure you installed pyspark module. You can do that by using pip3 install pyspark command.

Newmint answered 12/8, 2021 at 20:26 Comment(5)
Thanks for suggesting this approach, But with this approach, when I run the sccript I see error "class not found for jdbc driver". Which while running using pyspark I was able to point the jars. I tried, adding the jar and dependency in scala as well but no luck.Empale
Did you add the jdbc jar in $SPARK_HOME/jars folder?Newmint
That helped, I did add in jars folders. Thanks! One other hurdle says "py4j.protocol.Py4JJavaError: An error occurred while calling o405.javaToPython. : java.lang.IllegalArgumentException" . I am running on Intellij with java 8 versionEmpale
Please update your question with sample code and mark for which line you are getting error.Newmint
It worked after I cleared my environment variables on intellJ. Thanks for helping!Empale
W
0

Symbiosis of Scala Spark and PySpark

The library solves the problem of interaction between spark applications developed in Scala and Python. This can help out when Spark manipulations need to be performed in Scala and then in Python within a single run. It is possible to observe some need for such functionality:

The need may be caused by the lack of the ability to rewrite the code from one language to another.

How to use:

For a quick introduction, go to the demo repository: ScalaPySparkDemo

  • Create new Scala project.

  • add the dependency to build.sbt

    libraryDependencies ++= Seq(
      "ru.mardaunt"        %% "pysparkwrapper" % "0.1.0",
      "org.apache.spark"   %% "spark-sql"      % "3.3.2"
    )
    
  • Prepare your Scala Spark application. In our example, it looks prosaic:

    package ru.example
    
    import org.apache.spark.sql.SparkSession
    
    object PySparkDemo extends App {
    
      lazy val spark = SparkSession.builder()
                                   .master("local[*]")
                                   .getOrCreate()
    
    }
    
  • Prepare your PySpark application and place it in the resources.

  • Create a class that will be responsible for preparing the PySpark application for launch. To do this, extend the abstract PySparkApp class. This will be a kind of wrapper class over the python project.

    package ru.example
    
    import org.apache.log4j.Logger
    import org.apache.spark.sql.SparkSession
    import ru.mardaunt.python.PySparkApp
    import ru.mardaunt.python.logger.SimpleLogger
    
    class PySparkDemo(spark: SparkSession, logger: Logger)
      extends PySparkApp(mainPyName = "pyspark_main.py", needKerberosAuth = false)(spark, logger) {
    
      override protected val starterTool: String = "spark-submit"
    }
    

    Note that the name of the package where the wrapper class is stored must match the name of the python application package in the resources. In our case, is: ru.example

  • The application is ready to launch:

    import ru.mardaunt.python.logger.SimpleLogger
    
    new PySparkDemo(spark, SimpleLogger()).run()
    

    If you are running the application locally in the IDE, make sure that Spark is installed on the computer.

    If you want to run the application on the cluster, then build the JAR. You need to make sure that you are building a fat JAR. This is necessary because we have specified an external dependency:

    "ru.mardaunt" %% "pysparkwrapper" % "0.1.0"
    

    You can not build a fat JAR if you pass the artifact pysparkwrapper.jar to the --jars option for the spark-submit command.

    Or you can simply drag and drop all the files of the current repository from the package ru.mardaunt.python into your project.

Congratulations! Now you know how to use the library.


FAQ

How do I change the configuration in the PySpark App?

  • Override the field of the python wrapper child class of the project:
        override protected val additionalSparkConfList: List[String] =
          List(
            "--conf", "spark.app.name=MY_FAVORITE_APP",
            "--conf", "spark.driver.cores=4"
          )
    

How to pass arguments to the PySpark App?

  • You can pass a list of arguments to the "run" method:
      val args = List("a", "b", "c")
      new PySparkDemo(spark, SimpleLogger()).run(args)
    
    Or override the wrapper class field:
      override protected val pythonArgs: List[String] = List("a", "b", "c")
    

How do I enable kerberos authorization?

  • By default, Kerberos authorization is enabled. But you can control authorization using a flag from the wrapper class:
      needKerberosAuth = false
    

I need specific dependencies in python. How can I use my dependencies in a PySpark app?

  • To do this, you should already have a python environment with installed libraries. Then you can configure your PySpark app by passing the path to the python interpreter to the driver and executors:
        override protected val additionalSparkConfList: List[String] =
          List(
            "--conf", "spark.app.name=MY_FAVORITE_APP",
            "--conf", "spark.driver.cores=4",
            "--conf", "spark.pyspark.python=/your/python/loc/bin/python",
            "--conf", "spark.pyspark.driver.python=/your/python/loc/bin/python"
          )
    
Wb answered 4/8, 2023 at 13:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.