Spark broadcasted variable returns NullPointerException when run in Amazon EMR cluster
Asked Answered
F

2

10

The variables I share via broadcast are null in the cluster.

My application is quite complex, but I have written this small example that works flawlessly when I run it locally, but it fails in the cluster:

package com.gonzalopezzi.bigdata.bicing

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

object PruebaBroadcast2 extends App {
  val conf = new SparkConf().setAppName("PruebaBroadcast2")
  val sc = new SparkContext(conf)

  val arr : Array[Int] = (6 to 9).toArray
  val broadcasted = sc.broadcast(arr)

  val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines
  rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println)  // NullPointerException in the flatmap. broadcasted is null

}

I don't know if the problem is a coding error or a configuration issue.

This is the stacktrace I get:

15/07/07 20:55:13 INFO scheduler.DAGScheduler: Job 0 failed: collect at PruebaBroadcast2.scala:24, took 0.992297 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-36-49.ec2.internal): java.lang.NullPointerException
    at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
    at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Command exiting with ret '1'

Can anyone help me fix this?

At least, can you tell me if you see something strange in the code? If you think the code is ok, please tell me, as it would mean that the problem is in the configuration of the cluster.

Fairtrade answered 8/7, 2015 at 21:11 Comment(0)
F
20

Finally I got it working.

It doesn't work declaring the object like this:

object MyObject extends App {

But it works, if you declare an object with a main function:

object MyObject {
    def main (args : Array[String]) {
    /* ... */
    }
}

So, the short example in the question works if I rewrite it this way:

object PruebaBroadcast2 {

  def main (args: Array[String]) {
    val conf = new SparkConf().setAppName("PruebaBroadcast2")
    val sc = new SparkContext(conf)

    val arr : Array[Int] = (6 to 9).toArray
    val broadcasted = sc.broadcast(arr)

    val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2)

    rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println)
  }
}

This problem seems related to this bug: https://issues.apache.org/jira/browse/SPARK-4170

Fairtrade answered 9/7, 2015 at 16:37 Comment(3)
the bug states "fixed", but I still seem to run into the same issue (cdh 5.5.2)Osteoporosis
The bug states "fixed" but the fix just prints a warning: "Subclasses of scala.App may not work correctly. Use a main() method instead."Aldose
Its a trick, but aesthetically a little more pleasing to me, you can do object PruebaBroadcast2 extends App {{ /* your code */ }}Bailiff
S
1

I had similar issue. The issue was I have a variable, and used it in RDD map function, and I got null value. This is my original code:

object MyClass extends App {
    ...
    val prefix = "prefix" 
    val newRDD = inputRDD.map(s => prefix + s) // got null for prefix
    ...
}

And I found it works in any function not just main():

object MyClass extends App {
    ...
    val prefix = "prefix" 
    val newRDD = addPrefix(input, prefix)
    def addPrefix(input: RDD[String], prefix: String): RDD[String] = {
        inputRDD.map(s => prefix + s)
    }
}
Scirrhous answered 13/4, 2018 at 21:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.