Is it possible to execute a command on all workers within Apache Spark?
Asked Answered
C

3

7

I have a situation where I want to execute a system process on each worker within Spark. I want this process to be run an each machine once. Specifically this process starts a daemon which is required to be running before the rest of my program executes. Ideally this should execute before I've read any data in.

I'm on Spark 2.0.2 and using dynamic allocation.

Commando answered 29/11, 2016 at 19:10 Comment(1)
Duplicate of: #37343937Seldan
U
7

You may be able to achieve this with a combination of lazy val and Spark broadcast. It will be something like below. (Have not compiled below code, you may have to change few things)

object ProcessManager {
  lazy val start = // start your process here.
}

You can broadcast this object at the start of your application before you do any transformations.

val pm = sc.broadcast(ProcessManager)

Now, you can access this object inside your transformation like you do with any other broadcast variables and invoke the lazy val.

rdd.mapPartition(itr => {
  pm.value.start
  // Other stuff here.
}
Uella answered 29/11, 2016 at 22:7 Comment(5)
Won't this execute the process once per partition and not once per worker?Commando
You are right, that is just an example. But since it is a lazy val and ProcessManager is an "object", it runs only once in an executor.Uella
Broadcasting that object is a little odd. You should broadcast data, not code. Just having the object and accessing the start variable will be enough. That way you don't need the ProcessManager object to be serializable.Vaughnvaught
@Uella could you please help me figure out jave analogy\Caballero
@Uella now your code will work not on all workers, but on workers that have rdd. Am I wrong?Caballero
V
2

An object with static initialization which invokes your system process should do the trick.

object SparkStandIn extends App {
  object invokeSystemProcess {
    import sys.process._
    val errorCode = "echo Whatever you put in this object should be executed once per jvm".!

    def doIt(): Unit = {
      // this object will construct once per jvm, but objects are lazy in
      // another way to make sure instantiation happens is to check that the errorCode does not represent an error
    }
  }
  invokeSystemProcess.doIt()
  invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once
}
Vaughnvaught answered 29/11, 2016 at 19:23 Comment(1)
But how do you ensure it is actually initialized without repeating calls on every transformation?Stob
C
0

A specific answer for a specific use case, I have a cluster with 50 nodes and I wanted to know which ones have CET timezone set:

(1 until 100).toSeq.toDS.
mapPartitions(itr => {
        sys.process.Process(
                Seq("bash", "-c", "echo $(hostname && date)")
        ).
        lines.
        toIterator
}).
collect().
filter(_.contains(" CET ")).
distinct.
sorted.
foreach(println)

Notice I don't think it's guaranteed 100% you'll get a partition for every node so the command might not get run on every node, even using using a 100 elements Dataset in a cluster with 50 nodes like the previous example.

Clynes answered 13/12, 2019 at 11:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.