How to add a SparkListener from pySpark in Python?
Asked Answered
G

2

20

I want to create a Jupyter/IPython extension to monitor Apache Spark Jobs.

Spark provides a REST API.

However instead of polling the server, I want the event updates to be sent through callbacks.

I am trying to register a SparkListener with the SparkContext.addSparkListener(). This feature is not available in the PySpark SparkContext object in Python. So how can I register a python listener to Scala/Java version of the context from Python. Is it possible to do this through py4j? I want python functions to be called when the events fire in the listener.

Golightly answered 20/5, 2017 at 7:5 Comment(0)
X
29

It is possible although it is a bit involved. We can use Py4j callback mechanism to pass message from a SparkListener. First lets create a Scala package with all required classes. Directory structure:

.
├── build.sbt
└── src
    └── main
        └── scala
            └── net
                └── zero323
                    └── spark
                        └── examples
                            └── listener
                                ├── Listener.scala
                                ├── Manager.scala
                                └── TaskListener.scala

build.sbt:

name := "listener"

organization := "net.zero323"

scalaVersion := "2.11.7"

val sparkVersion = "2.1.0"

libraryDependencies ++= List(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "net.sf.py4j" % "py4j" % "0.10.4"  // Just for the record
)

Listener.scala defines a Python interface we are going to implement later

package net.zero323.spark.examples.listener

/* You can add arbitrary methods here, 
 * as long as these match corresponding Python interface 
 */
trait Listener {
  /* This will be implemented by a Python class.
   * You can of course use more specific types, 
   * for example here String => Unit */
  def notify(x: Any): Any
}

Manager.scala will be used to forward messages to Python listener:

package net.zero323.spark.examples.listener

object Manager {
  var listeners: Map[String, Listener] = Map()

  def register(listener: Listener): String = {
    this.synchronized {
      val uuid = java.util.UUID.randomUUID().toString
      listeners = listeners + (uuid -> listener)
      uuid
    }
  }

  def unregister(uuid: String) = {
    this.synchronized {
      listeners = listeners - uuid
    }
  }

  def notifyAll(message: String): Unit = {
    for { (_, listener) <- listeners } listener.notify(message)
  }

}

Finally a simple SparkListener:

package net.zero323.spark.examples.listener

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

/* A simple listener which captures SparkListenerTaskEnd,
 * extracts numbers of records written by the task
 * and converts to JSON. You can of course add handlers 
 * for other events as well.
 */
class PythonNotifyListener extends SparkListener { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
    val message = compact(render(
      ("recordsWritten" ->  recordsWritten)
    ))
    Manager.notifyAll(message)
  }
}

Lets' package our extension:

sbt package

and start PySpark session adding a generated jar to the class path and registering listener:

 $SPARK_HOME/bin/pyspark \
   --driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
   --conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener

Next we have to define a Python object which implements Listener interface:

class PythonListener(object):
    package = "net.zero323.spark.examples.listener"

    @staticmethod
    def get_manager():
        jvm = SparkContext.getOrCreate()._jvm
        manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
        return manager

    def __init__(self):
        self.uuid = None

    def notify(self, obj):
        """This method is required by Scala Listener interface
        we defined above.
        """
        print(obj)

    def register(self):
        manager = PythonListener.get_manager()
        self.uuid = manager.register(self)
        return self.uuid

    def unregister(self):
        manager =  PythonListener.get_manager()
        manager.unregister(self.uuid)
        self.uuid = None

    class Java:
        implements = ["net.zero323.spark.examples.listener.Listener"]

start callback server:

sc._gateway.start_callback_server()

create and register listener:

listener = PythonListener()

register it:

listener.register()

and test:

>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}

On exit you should shutdown the callback server:

sc._gateway.shutdown_callback_server()

Note:

This should be used with caution when working with Spark streaming, which internally uses callback server.

Edit:

If this is to much hassle you could just define org.apache.spark.scheduler.SparkListenerInterface:

class SparkListener(object):
    def onApplicationEnd(self, applicationEnd):
        pass
    def onApplicationStart(self, applicationStart):
        pass
    def onBlockManagerRemoved(self, blockManagerRemoved):
        pass
    def onBlockUpdated(self, blockUpdated):
        pass
    def onEnvironmentUpdate(self, environmentUpdate):
        pass
    def onExecutorAdded(self, executorAdded):
        pass
    def onExecutorMetricsUpdate(self, executorMetricsUpdate):
        pass
    def onExecutorRemoved(self, executorRemoved):
        pass
    def onJobEnd(self, jobEnd):
        pass
    def onJobStart(self, jobStart):
        pass
    def onOtherEvent(self, event):
        pass
    def onStageCompleted(self, stageCompleted):
        pass
    def onStageSubmitted(self, stageSubmitted):
        pass
    def onTaskEnd(self, taskEnd):
        pass
    def onTaskGettingResult(self, taskGettingResult):
        pass
    def onTaskStart(self, taskStart):
        pass
    def onUnpersistRDD(self, unpersistRDD):
        pass
    class Java:
        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]

extend it:

class TaskEndListener(SparkListener):
    def onTaskEnd(self, taskEnd):
        print(taskEnd.toString())

and use it directly:

>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)

While simpler, this method is not selective (more traffic between JVM and Python) requires handling Java objects inside Python session.

Xylon answered 20/5, 2017 at 9:12 Comment(3)
just reading this made me seriously consider switching to scala ^^Pasteurize
SparkListenerInterface has been added to in recent Spark versions. Update as needed to work with your Spark version. For Spark 2.4.6 I had to add onBlockManagerAdded.Closelipped
Whats the caveat here of using with structured streaming? Should this sc._gateway.start_callback_server() not be explicitly enabled?Harte
A
4

I know this is a very old question. However I ran into this very same issue where we had to configure a custom developed listener in a PySpark application. Possible that in the last few years the approach changed.

All we had to do is to specify the dependent jar file that contained the listener jar and also set a --conf spark.extraListeners property.

Example

--conf spark.extraListeners=fully.qualified.path.to.MyCustomListenerClass --conf my.param.name="hello world"

MyCustomListenerClass can have a single argument constructor that accepts a SparkConf object. If you want to pass any parameters to your listener, just set them as configuration key-values and you should be able to access them from the constructor.

Example

public MyCustomListenerClass(SparkConf conf) {
        this.myParamName = conf.get("my.param.name", "default_param_value");
}

Hope this helps someone looking for a simpler strategy. The approach works on both Scala and PySpark because nothing changes in the spark application, the framework takes care of registering your listener by just passing the extraListeners parameter.

Adaadabel answered 25/5, 2022 at 14:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.