I enabled Kryo serialization for my Spark job, enabled the setting to require registration, and ensured all my types were registered.
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
Wallclock-time performance of the job worsened by about 20% and the number of bytes shuffled increased by almost 400%.
This seems really surprising to me, given the Spark documentation's suggestion that Kryo should be better.
Kryo is significantly faster and more compact than Java serialization (often as much as 10x)
I manually invoked the serialize
method on instances of Spark's org.apache.spark.serializer.KryoSerializer
and org.apache.spark.serializer.JavaSerializer
with an example of my data. The results were consistent with the suggestions in the Spark documentation: Kryo produced 98 bytes; Java produced 993 bytes. That really is a 10x improvement.
A possibly confounding factor is that the objects which are being serialized and shuffled implement the Avro GenericRecord
interface. I tried registering the Avro schemas in the SparkConf
, but that showed no improvement.
I tried making new classes to shuffle the data which were simple Scala case class
es, not including any of the Avro machinery. It didn't improve the shuffle performance or number of bytes exchanged.
The Spark code ends up boiling down to following:
case class A(
f1: Long,
f2: Option[Long],
f3: Int,
f4: Int,
f5: Option[String],
f6: Option[Int],
f7: Option[String],
f8: Option[Int],
f9: Option[Int],
f10: Option[Int],
f11: Option[Int],
f12: String,
f13: Option[Double],
f14: Option[Int],
f15: Option[Double],
f16: Option[Double],
f17: List[String],
f18: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(f: Int) : AnyRef = ???
def put(f: Int, value: Any) : Unit = ???
def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
val SCHEMA$: org.apache.avro.Schema = ???
}
case class B(
f1: Long
f2: Long
f3: String
f4: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(field$ : Int) : AnyRef = ???
def getSchema() : org.apache.avro.Schema = B.SCHEMA$
def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
val SCHEMA$ : org.apache.avro.Schema = ???
}
def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
joined.map { case (_, asAndBs) => asAndBs }
}
Do you have any idea what might be going on or how I could get the better performance that should be available from Kryo?
cogroup
. – Summary