Use Serializable lambda in Spark JavaRDD transformation
Asked Answered
L

2

6

I am trying to understand the following code.

// File: LambdaTest.java

package test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

public class LambdaTest implements Ops {

  public static void main(String[] args) {
    new LambdaTest().job();
  }

  public void job() {
    SparkConf conf = new SparkConf()
      .setAppName(LambdaTest.class.getName())
      .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer>              lst  = Arrays.asList(1, 2, 3, 4, 5, 6);
    JavaRDD<Integer>           rdd  = jsc.parallelize(lst);
    Function<Integer, Integer> func1 = (Function<Integer, Integer> & Serializable) x -> x * x;
    Function<Integer, Integer> func2 = x -> x * x;

    System.out.println(func1.getClass());  //test.LambdaTest$$Lambda$8/390374517
    System.out.println(func2.getClass());  //test.LambdaTest$$Lambda$9/208350681

    this.doSomething(rdd, func1);  // works
    this.doSomething(rdd, func2);  // org.apache.spark.SparkException: Task not serializable
  }
}

// File: Ops.java

package test;

import org.apache.spark.api.java.JavaRDD;
import java.util.function.Function;    

public interface Ops {

  default void doSomething(JavaRDD<Integer> rdd, Function<Integer, Integer> func) {
    rdd.map(x -> x + func.apply(x))
       .collect()
       .forEach(System.out::println);
  }

}

The difference is func1 is casted with a Serializable bound, while func2 is not.

When looking at the run time class of the two functions, they are both anonymous class under LambdaTest class

They are both used in an RDD transformation in an interface, then the two functions and LambdaTest should be serializable.

As you see, LambdaTest does not implement Serializable interface. So I think the two func should not work. But surprisingly, func1 works.

The stack trace for func2 is the following:

Serialization stack:
    - object not serializable (class: test.LambdaTest$$Lambda$9/208350681, value: test.LambdaTest$$Lambda$9/208350681@61d84e08)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$1024e30a$1:(Ljava/util/function/Function;Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$10/1470295349, fr.leboncoin.etl.jobs.test.Ops$$Lambda$10/1470295349@4e1459ea)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    ... 19 more

It seems that if a function bound with Serializable, the object containing it need not to be serialized, which makes me confused.

Any explanation on this is highly appreciated.

------------------------------ Updates ------------------------------

I have tried to use abstract class instead of interface:

//File: AbstractTest.java

public class AbstractTest {

  public static void main(String[] args) {
    new AbstractTest().job();
  }

  public void job() {
    SparkConf conf = new SparkConf()
      .setAppName(AbstractTest.class.getName())
      .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer>    lst = Arrays.asList(1, 2, 3, 4, 5, 6);
    JavaRDD<Integer> rdd = jsc.parallelize(lst);

    Ops ops = new Ops() {

      @Override
      public Integer apply(Integer x) {
        return x + 1;
      }
    };

    System.out.println(ops.getClass()); // class fr.leboncoin.etl.jobs.test.AbstractTest$1
    ops.doSomething(rdd);
  }
}

// File: Ops.java

public abstract class Ops implements Serializable{

  public abstract Integer apply(Integer x);

  public void doSomething(JavaRDD<Integer> rdd) {
    rdd.map(x -> x + apply(x))
       .collect()
       .forEach(System.out::println);
  }
}

It does not work either, even if Ops class is compiled in separate files with AbstractTest class. The ops object's class name is class fr.leboncoin.etl.jobs.test.AbstractTest$1. According to the following stack track, it seem that it needs to serialize AbstractTestin order to serialize AbstractTest$1.

Serialization stack:
    - object not serializable (class: test.AbstractTest, value: test.AbstractTest@21ac5eb4)
    - field (class: test.AbstractTest$1, name: this$0, type: class test.AbstractTest)
    - object (class test.AbstractTest$1, test.AbstractTest$1@36fc05ff)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$6d6228b6$1:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/208350681, fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/208350681@4acb2510)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    ... 19 more
Lindbergh answered 4/8, 2015 at 11:2 Comment(0)
E
3

LambdaTest doesn't need to be Serializable as it's not being sent over the wire - there's no reason to do that.

On the other hand both func1 and func1 do have to be Serializable as Spark will be using them to perform computation (on the RDD and therefore this code will have to be sent over the wire to the worker nodes. Notice that even though you write it all in the same class, after compilation your lambdas will be put in separate files, thanks to that the whole class doesn't have to be sent over the wire -> the outer class doesn't need to be Serializable.

As for why fun1 works, when you do not use type casting the Java compiler will infer the type of a lambda expression for you. So in this case the code generated for fun2 will simply implement a Function (since that's the target variable's type). On the other hand if a type cannot be inferred from the context (like in your case, the compiler has no way of knowing that fun1 has to be Serializable since it's a feature required by Spark) you can use type casting as in your example to explicitly provide a type. In that case the code generated by the compiler will be implementing both the Function and Serializable interfaces and the compiler won't try to infer the type on it's own.

You can find it described in the state of lambda under 5. Contexts for target typing.

Edgewise answered 4/8, 2015 at 11:20 Comment(3)
I am not sure of what you said, "Notice that even though you write it all in the same class, after compilation your lambdas will be put in separate files, thanks to that the whole class doesn't have to be sent over the wire". I have update my post, with abstract class, instead of interface, knowing that abstract class is also compiled in a separated file.Lindbergh
@HaoRen well I did tell you that LambdaTest doesn't have to be serializable because it's not being serialized and sent over the wire. Why do you think it has to be? Only the objects that are used in RDD transformations have to be serializable as Spark is performing operations on them. That's why func1 and func2 have to implement SerializableEdgewise
LambdaTest is ok, I fully understand what you said about LambdaTest. Please check out the UPDATE on the original post I talked about in my last comment. I was not talking about LambdaTest... Actually, I use an abstract class not interface to do the same thing, which is also compiled in a separate file, since it is an anonymous class. It does not work. That makes me think your point on separate file is not true. You can also checkout a related question I ask on spark forum: mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/…. Feel free to ask for more details if needed.Lindbergh
B
-2

The above answer is correct. As for the additional abstract class question, the answer is the abstract class implemented in AbstractTest class is a inner class, which one has the reference of outclass. When serializing a object, it will serialize its fields, an outclass AbstractTest is not Serializable, thus it can't be serialized.

Black answered 26/5, 2022 at 1:42 Comment(1)
Hi, This does not seam to be an actual full answer to the question. Maybe rather use comments for additions to an answer.Pylorus

© 2022 - 2024 — McMap. All rights reserved.