Kryo serializer causing exception on underlying Scala class WrappedArray
Asked Answered
L

2

6

Two questions, the answer to the general one will guide me on how minimal I can make a MVCE.

1) How can I know to register WrappedArray up front, (and every other class in Scala I might use)? Is it normal to have to register classes from libraries with Kryo?

and the specific:

2) How do I fix this? (Willing to admit I might have something else screwy going on that if reflecting a false error here, so don't kill yourselves trying to reproduce this)

DETAILS

Testing out a Spark program in Java using our customer classes related to genetics and statistic, on Spark 1.4.1, Scala 2.11.5 with the following settings on SparkConf:

// for kyro serializer it wants to register all classes that need to be serialized
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class};

SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData")
                <SNIP other settings to declare master>
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                //require registration of all classes with Kryo
                .set("spark.kryo.registrationRequired", "true")
                .registerKryoClasses(kryoClassArray);

Getting this error (repeated at end of long error listing):

Caused by: java.lang.IllegalArgumentException: Class is not
registered: scala.collection.mutable.WrappedArray$ofRef Note: To
register this class use:
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);

But I never call that class from my code. I can add scala.collection.mutable.WrappedArray to the kryoClassArray but it doesn't fix the problem. If I add scala.collection.mutable.WrappedArray$ofRef.class (as suggested in the error) that's a syntax error, I see I can't declare an anonymous function here?

MVCE: I have started a MVCE but the problem is, to do one with our classes requires external libraries and text/data files. Once I strip out our classes, I don't have the problem. If someone could answer the general question it might help guide me on how much of an MVCE I can come up with.

As I am writing this question I got the go-ahead to update to 1.5.2, will see if there's any change there and update question if so.

Short of a MVCE here's my class declarations:

public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable {

class PrintHetSharing implements VoidFunction<DropResult> {

class SparkDoDrop implements Function<Integer, Integer> {

Full errors:

16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:55646/user/Executor#214759698]) with ID 0
16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
Letreece answered 12/1, 2016 at 6:3 Comment(3)
1. Knowing what classes will require serialization requires you to inspect your code and understand what it is doing (you only pasted the conf sample, not the usage). 2. Same as 1, impossible to answer without a code sample.Woodpecker
Sure, @DanielL. I will edit in some code. But it sounds like you are saying I need to know every underlying class? As a general principle? I am writing Java so I didn't expect to need an awareness of the underlying Scala classes to make Kryo work. ThanksLetreece
@DanielL. I appreciate the MVCE request, the problem I run into is, to do one with our classes requires external libraries and text/data files. Once I strip out our classes and the need for our files, I don't have the problem. If someone could answer the general question it might help guide me on how much of an MVCE I can come up with. I do implement Serializable in all classes, either explicitly or by implementing Functions from Spark like importing org.apache.spark.api.java.function.Function and org.apache.spark.api.java.function.VoidFunctionLetreece
F
9

In Scala you should fix this issue adding 'scala.collection.mutable.WrappedArray.ofRef[_]' as registered class as in the following snippet:

conf.registerKryoClasses(
  Array(
    ...
    classOf[Person],
    classOf[Array[Person]],
    ...
    classOf[scala.collection.mutable.WrappedArray.ofRef[_]]
  )
)
Foggia answered 24/5, 2016 at 16:3 Comment(3)
First your answer looks like Scala but I am in Java but I get the point :) I appreciate the answer but the underlying question is unanswered, why should I have to declare this class when I don't use it? I don't have to declare every class in Spark, why this one? I have not tried to use Kryo for a bit, I should re-implement it now that our solution is much farther down the road and Spark is a couple versions newer. +1 though, thanks!Letreece
not sure why the java tag wasn't on the question, my bad, sorry, it was in the question but not the tags, oopsLetreece
accepted this answer now that I am revisiting this, it doesn't provide a complete answer as this scala code doesn't work in Java. All the same it's closer to a specific answer. I could swear on another question someone posted how to add this Scala class into a Java array, using the example .ofRef[] or the Java-esque $ofRef both don't work. For now I have relaxed the "required" setting on Kryo.Letreece
W
2

You don't need to make everything serializable, independent of it is part of a client library or not. But you DO need to make any lambda that will take effect on the executors serializable. Those do not run on the master node, so there is no way to prevent serialization (nor do you want to, as the entire purpose of Spark is distributed computation).

For examples and such (and if you don't quite grasp the concept yet), check the official docs about this.

Woodpecker answered 13/1, 2016 at 20:11 Comment(1)
Thanks, by clearing up the general question it allows me to know where to focus my efforts, very useful! I am still a bit mystified why the scala WrappedArray class is being reported as the one that can't be serialized. I will strip down my code and put it back together. I do understand anonymous functions and use them when using built-in classes -- when I am using our classes I declare them separately. I will still work on a MVCE thanks againLetreece

© 2022 - 2024 — McMap. All rights reserved.