Require kryo serialization in Spark (Scala)
Asked Answered
N

3

33

I have kryo serialization turned on with this:

conf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )

I want to ensure that a custom class is serialized using kryo when shuffled between nodes. I can register the class with kryo this way:

conf.registerKryoClasses(Array(classOf[Foo]))

As I understand it, this does not actually guarantee that kyro serialization is used; if a serializer is not available, kryo will fall back to Java serialization.

To guarantee that kryo serialization happens, I followed this recommendation from the Spark documentation:

conf.set("spark.kryo.registrationRequired", "true")

But this causes IllegalArugmentException to be thrown ("Class is not registered") for a bunch of different classes which I assume Spark uses internally, for example the following:

org.apache.spark.util.collection.CompactBuffer
scala.Tuple3

Surely I do not have to manually register each of these individual classes with kryo? These serializers are all defined in kryo, so is there a way to automatically register all of them?

Natiha answered 13/7, 2015 at 21:53 Comment(0)
L
43

As I understand it, this does not actually guarantee that kyro serialization is used; if a serializer is not available, kryo will fall back to Java serialization.

No. If you set spark.serializer to org.apache.spark.serializer. KryoSerializer then Spark will use Kryo. If Kryo is not available, you will get an error. There is no fallback.

So what is this Kryo registration then?

When Kryo serializes an instance of an unregistered class it has to output the fully qualified class name. That's a lot of characters. Instead, if a class has been pre-registered, Kryo can just output a numeric reference to this class, which is just 1-2 bytes.

This is especially crucial when each row of an RDD is serialized with Kryo. You don't want to include the same class name for each of a billion rows. So you pre-register these classes. But it's easy to forget to register a new class and then you're wasting bytes again. The solution is to require every class to be registered:

conf.set("spark.kryo.registrationRequired", "true")

Now Kryo will never output full class names. If it encounters an unregistered class, that's a runtime error.

Unfortunately it's hard to enumerate all the classes that you are going to be serializing in advance. The idea is that Spark registers the Spark-specific classes, and you register everything else. You have an RDD[(X, Y, Z)]? You have to register classOf[scala.Tuple3[_, _, _]].

The list of classes that Spark registers actually includes CompactBuffer, so if you see an error for that, you're doing something wrong. You are bypassing the Spark registration procedure. You have to use either spark.kryo.classesToRegister or spark.kryo.registrator to register your classes. (See the config options. If you use GraphX, your registrator should call GraphXUtils. registerKryoClasses.)

Leoleod answered 13/7, 2015 at 22:43 Comment(8)
I was using SparkConf.registerKryoClasses, which I was certain I found in the documentation somewhere, but am unable to find now. I will use the spark.kryo.classesToRegister setting.Natiha
@Daniel Darabos : My model class has only getter and setters which i want to register using kryo, do i need to register the data types which is used inside the model class as well.. for example String type.Emarie
String is registered by default, as are all the primitive classes like Long. But in general you will need to register everything that is contained inside the class you want to serialize. You don't need to think too much though: If you did not register something you will get an error message if you enabled spark.kryo.registrationRequired.Leoleod
Daniel. I am being told to register org.apache.spark.sql.catalyst.InternalRow[].class and many of classes similar. I am using spark 1.5.1. Is there something I am missing on my setup. I have followed what you have stated.Jidda
Yes, Spark has a bunch of classes that can get serialized, but that do not get automatically registered. This is quite a sad thing and a ticket is open about it: issues.apache.org/jira/browse/SPARK-6497. The workaround is to register those classes yourself. If the class is private you have to use Class.forName. If it's an array, you have to use the ugly Class.forName("[Lorg.apache.spark.SomePrivateClass;") syntax. We register about 150 classes in our code, most of which are Spark classes.Leoleod
Thanks Daniel for confirming this. I have about 40 classes registered right now but am stuck trying to figure out how to register this one. Note: To register this class use: kryo.register(byte[][].class). Have you seen this one before?Jidda
Not byte[][]. Have you tried kryo.register(classOf[Array[Array[Byte]]])? We have that for Double and Long.Leoleod
That solution did not work. I have opened a new post on this specific issue. If you have any additional insight that would be great. Thanks! #37791446Jidda
V
3

I have a method to get all the class names which are required to be registered quickly.

implicit class FieldExtensions(private val obj: Object) extends AnyVal {
    def readFieldAs[T](fieldName: String): T = {
        FieldUtils.readField(obj, fieldName, true).asInstanceOf[T]
    }

    def writeField(fieldName: String, value: Object): Unit = {
        FieldUtils.writeField(obj, fieldName, value, true)
    }
}

class LogClassResolver extends DefaultClassResolver {

    override def registerImplicit(t: Class[_]): Registration = {
        println(s"registerImplicitclasstype:${t.getName}")

        super.registerImplicit(t)
    }

    def copyFrom(resolver: DefaultClassResolver): Unit = {
        this.kryo = resolver.readFieldAs("kryo")

        this.idToRegistration.putAll(resolver.readFieldAs("idToRegistration"))
        this.classToRegistration.putAll(resolver.readFieldAs("classToRegistration"))
        this.classToNameId = resolver.readFieldAs("classToNameId")
        this.nameIdToClass = resolver.readFieldAs("nameIdToClass")
        this.nameToClass = resolver.readFieldAs("nameToClass")
        this.nextNameId = resolver.readFieldAs("nextNameId")

        this.writeField("memoizedClassId", resolver.readFieldAs("memoizedClassId"))
        this.writeField("memoizedClassIdValue", resolver.readFieldAs("memoizedClassIdValue"))
        this.writeField("memoizedClass", resolver.readFieldAs("memoizedClass"))
        this.writeField("memoizedClassValue", resolver.readFieldAs("memoizedClassValue"))
    }
}

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo): Unit = {
        val newResolver = new LogClassResolver
        newResolver.copyFrom(kryo.getClassResolver.asInstanceOf[DefaultClassResolver])
        FieldUtils.writeField(kryo, "classResolver", newResolver, true)
    }
}

And you just need to register MyRegistrator in spark session.

val sparkSession = SparkSession.builder()
    .appName("Your_Spark_App")
    .config("spark.kryo.registrator", classOf[MyRegistrator].getTypeName)
    .getOrCreate()
    // all your spark logic will be added here

After that, submit a small sample spark app to the cluster, all the class names which need registration will be printed to stdout. Then the following linux command will get the class name list:

yarn logs --applicationId {your_spark_app_id} | grep registerImplicitclasstype >> type_names.txt
sort -u type_names.txt

Then register all class name in your registrator: kryo.registser(Class.forName("class name"))

After that, you can add config("spark.kryo.registrationRequired", "true") to the spark conf. Sometimes the yarn logs may get lost, you can rerun the above process again. ps: The code above works for spark version 2.1.2.

Enjoy.

Victim answered 12/4, 2019 at 4:16 Comment(0)
H
0

Based on what you're seeing, best guess is you're missing the statement:

sparkConf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )

For the past few days I've also been struggling with converting serialization to Kryo, including for GraphX, including registering scala.Tuple3 with Kryo, apparently because Spark/GraphX code is creating a Tuple3 when I do a 'sortBy'.

Have been adding a bunch of other classes, one-by-one, to list to register with Kryo, mostly Scala and Spark classes I wouldn't think I'd need to add. Thinking/hoping there's got to be a better way to use Kryo with Spark.

Herringbone answered 14/7, 2015 at 0:18 Comment(1)
I have this statement. I will update my question to indicate that.Natiha

© 2022 - 2024 — McMap. All rights reserved.