Scalatest and Spark giving "java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper"
Asked Answered
C

2

11

I’m testing a Spark Streaming application with the help of "com.holdenkarau.spark-testing-base" and scalatest.

import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfter, FunSuite }

class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase {

  var delim: String = ","

  before {
    System.clearProperty("spark.driver.port")
   }

  test(“This Fails“) {

    val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv"))
    val input = source.getLines.toList

    val rowRDDOut = Calculator.do(sc.parallelize(input))   //Returns DataFrame

    val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1))

    source.close
  }
}

I get Serialization exception for field 'delim':

org.apache.spark.SparkException: Task not serializable
[info]   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
[info]   at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
[info]   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
[info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
[info]   at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
[info]   at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
[info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
[info]   at org.apache.spark.rdd.RDD.map(RDD.scala:323)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
[info] Serialization stack:
[info]  - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@78b339fa)
[info]  - field (class: org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper)

If I replace 'delim' by String value in place, it works fine.

val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + “,” + row.getAs[String](1))

What’s the difference between first and second version?

Thanks in advance!

Carinacarinate answered 7/2, 2017 at 3:7 Comment(0)
P
19

The problem is not the type of delim (String) it's delim itself.

Try not to define variables outside your test() methods. If you define delm inside your test it should work.

test(“This Fails“) {
   val delim = ","
   ...
}

Now, you may ask why? Well, when you reference delim from the outer scope, Scala will try to bring together the enclosing object class Test. This object contains a reference to org.scalatest.Assertions$AssertionsHelper that it's not Serializable (see your stacktrace).

Pravit answered 7/2, 2017 at 3:18 Comment(3)
Whoa Sir! Couldn't even think that! Thanks! Worked like a charm.Carinacarinate
Though I was getting the same error, I never thought my test class was in closureFoust
had similar issue with the scala check generator, for some reason its must be declared within a test for things to work.. :(Estren
P
0

I ran into this today, and the error persisted even after I moved all my code inside the test as mentioned in the accepted answer.

Finally, found out that I was using wrong syntax in the code (which the compiler did not catch). In my case it was something like:

// Wrong
df.filter(x => x.id === y)

// Right
df.filter(x => x.id == y)
Piazza answered 27/4, 2021 at 21:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.