I have a method to get all the class names which are required to be registered quickly.
implicit class FieldExtensions(private val obj: Object) extends AnyVal {
def readFieldAs[T](fieldName: String): T = {
FieldUtils.readField(obj, fieldName, true).asInstanceOf[T]
}
def writeField(fieldName: String, value: Object): Unit = {
FieldUtils.writeField(obj, fieldName, value, true)
}
}
class LogClassResolver extends DefaultClassResolver {
override def registerImplicit(t: Class[_]): Registration = {
println(s"registerImplicitclasstype:${t.getName}")
super.registerImplicit(t)
}
def copyFrom(resolver: DefaultClassResolver): Unit = {
this.kryo = resolver.readFieldAs("kryo")
this.idToRegistration.putAll(resolver.readFieldAs("idToRegistration"))
this.classToRegistration.putAll(resolver.readFieldAs("classToRegistration"))
this.classToNameId = resolver.readFieldAs("classToNameId")
this.nameIdToClass = resolver.readFieldAs("nameIdToClass")
this.nameToClass = resolver.readFieldAs("nameToClass")
this.nextNameId = resolver.readFieldAs("nextNameId")
this.writeField("memoizedClassId", resolver.readFieldAs("memoizedClassId"))
this.writeField("memoizedClassIdValue", resolver.readFieldAs("memoizedClassIdValue"))
this.writeField("memoizedClass", resolver.readFieldAs("memoizedClass"))
this.writeField("memoizedClassValue", resolver.readFieldAs("memoizedClassValue"))
}
}
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
val newResolver = new LogClassResolver
newResolver.copyFrom(kryo.getClassResolver.asInstanceOf[DefaultClassResolver])
FieldUtils.writeField(kryo, "classResolver", newResolver, true)
}
}
And you just need to register MyRegistrator
in spark session.
val sparkSession = SparkSession.builder()
.appName("Your_Spark_App")
.config("spark.kryo.registrator", classOf[MyRegistrator].getTypeName)
.getOrCreate()
// all your spark logic will be added here
After that, submit a small sample spark app to the cluster, all the class names which need registration will be printed to stdout. Then the following linux command will get the class name list:
yarn logs --applicationId {your_spark_app_id} | grep registerImplicitclasstype >> type_names.txt
sort -u type_names.txt
Then register all class name in your registrator:
kryo.registser(Class.forName("class name"))
After that, you can add config("spark.kryo.registrationRequired", "true")
to the spark conf.
Sometimes the yarn logs may get lost, you can rerun the above process again.
ps: The code above works for spark version 2.1.2.
Enjoy.