JavaSparkContext not serializable
Asked Answered
V

3

7

I'm using spark with cassandra, and i hava a JavaRDD<String> of clients. And for each client, i want to select from cassandra his Interactions like this :

avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() {
        @Override
        public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {               
            List<InteractionByMonthAndCustomer> b = javaFunctions(sc)
                    .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer")
                    .where("ctid =?", s)
                    .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() {
                        @Override
                        public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception {
                            return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"),
                                    cassandraRow.getString("motif"),
                                    cassandraRow.getDate("start"),
                                    cassandraRow.getDate("end"),
                                    cassandraRow.getString("ctid"),
                                    cassandraRow.getString("month")
                            );
                        }
                    }).collect();
            return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b);
        }
    });

For this i'm using one JavaSparkContext sc. But i got this error :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99)
at org.apache.spark.api.java.JavaRDD.mapToPair(JavaRDD.scala:32)
at fr.aid.cim.spark.dao.GenrateCustumorJourney.AllCleintInteractions(GenrateCustumorJourney.java:91)
at fr.aid.cim.spark.dao.GenrateCustumorJourney.main(GenrateCustumorJourney.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 14 more

I think that the JavaSparkContext must be serializable. But how can i make it serializable please ?

Thank you.

Villiers answered 30/12, 2014 at 14:0 Comment(0)
C
15

No, JavaSparkContext is not serializable and is not supposed to be. It can't be used in a function you send to remote workers. Here you're not explicitly referencing it but a reference is being serialized anyway because your anonymous inner class function is not static and therefore has a reference to the enclosing class.

Try rewriting your code with this function as a static, stand-alone object.

Cultural answered 30/12, 2014 at 15:10 Comment(0)
N
0

You cannot use SparkContext and create other RDDs from within an executor (map function of an RDD).

You have to create the Cassandra RDD (sc.cassandraTable) in the driver and then do a join between those two RDDs (client RDD and cassandra table RDD).

Nika answered 30/12, 2014 at 15:13 Comment(1)
True, code should not work any way (Spark forbids transformation inside transformation etc..)Helminthology
H
0

Declare it with transient keyword:

private transient JavaSparkContext sparkContext;
Helminthology answered 12/4, 2017 at 17:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.