Spark - Task not serializable: How to work with complex map closures that call outside classes/objects?
Asked Answered
B

2

20

Take a look at this question: Scala + Spark - Task not serializable: java.io.NotSerializableExceptionon. When calling function outside closure only on classes not objects.

Problem:

Suppose my mappers can be functions (def) that internally call other classes and create objects and do different things inside. (Or they can even be classes that extend (Foo) => Bar and do the processing in their apply method - but let'ś ignore this case for now)

Spark supports only Java Serialization for closures. Is there ANY way out of this? Can we use something instead of closures to do what I want to do? We can easily do this sort of stuff with Hadoop. This single thing is making Spark almost unusable for me. One cannot expect all 3rd party libraries to have all classes extend Serializable!

Probable Solutions:

Does something like this seem to be of any use: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

It certainly seems like a wrapper is the answer, but I cannot see exactly how.

Biotin answered 14/4, 2014 at 0:18 Comment(1)
Related: Also, avoid passing the SparkContext into a RDD map/filter/flatMap etc transformation which can give a similar errorPendley
B
12

I figured out how to do this myself!

You simply need to serialize the objects before passing through the closure, and de-serialize afterwards. This approach just works, even if your classes aren't Serializable, because it uses Kryo behind the scenes. All you need is some curry. ;)

Here's an example of how I did it:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Feel free to make Blah as complicated as you want, class, companion object, nested classes, references to multiple 3rd party libs.

KryoSerializationWrapper referes to: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Biotin answered 14/4, 2014 at 6:51 Comment(3)
Another option would be to implement Serializable interface in Blah class itself.Wagon
On a second thought, wrapper is making it more flexible, you can switch to different type of serialization if you want to.Wagon
@Wagon That was the point of the question. As it turns out, not only Blah but also the instance fields of Blah need to extend Serializable - this is obvious since everything will be recursively stored. What if your class uses 3rd party libraries and modifying their code would cause you a nightmare? That's where something like this comes in handy. Moreover, Java Serialization is slow.Biotin
A
3

In case of using Java API you should avoid anonymous class when passing to the mapping function closure. Instead of doing map( new Function) you need a class that extends your function and pass that to the map(..) See: https://yanago.wordpress.com/2015/03/21/apache-spark/

Antiicer answered 22/3, 2015 at 16:19 Comment(5)
when you say class extends your function, i wanted to extend VoidFunction which is interface, i am bit confused here.. do i need to implement or extend,if i need to extend i need to actually create interface right?Retiarius
In that particular example you need to extend PairFunction. No need to implement interlace. map takes (PairFunction<T, K2, V2> f) or (Function<T, U> f)Antiicer
Would you please give an example here?Femme
Site was suspended, is there an example you can give us?Hightail
If I understand your answer correctly, no, this doesn't work.Bullfighter

© 2022 - 2024 — McMap. All rights reserved.