Why is Spark performing worse when using Kryo serialization?
Asked Answered
S

2

16

I enabled Kryo serialization for my Spark job, enabled the setting to require registration, and ensured all my types were registered.

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)

Wallclock-time performance of the job worsened by about 20% and the number of bytes shuffled increased by almost 400%.

This seems really surprising to me, given the Spark documentation's suggestion that Kryo should be better.

Kryo is significantly faster and more compact than Java serialization (often as much as 10x)

I manually invoked the serialize method on instances of Spark's org.apache.spark.serializer.KryoSerializer and org.apache.spark.serializer.JavaSerializer with an example of my data. The results were consistent with the suggestions in the Spark documentation: Kryo produced 98 bytes; Java produced 993 bytes. That really is a 10x improvement.

A possibly confounding factor is that the objects which are being serialized and shuffled implement the Avro GenericRecord interface. I tried registering the Avro schemas in the SparkConf, but that showed no improvement.

I tried making new classes to shuffle the data which were simple Scala case classes, not including any of the Avro machinery. It didn't improve the shuffle performance or number of bytes exchanged.

The Spark code ends up boiling down to following:

case class A(
    f1: Long,
    f2: Option[Long],
    f3: Int,
    f4: Int,
    f5: Option[String],
    f6: Option[Int],
    f7: Option[String],
    f8: Option[Int],
    f9: Option[Int],
    f10: Option[Int],
    f11: Option[Int],
    f12: String,
    f13: Option[Double],
    f14: Option[Int],
    f15: Option[Double],
    f16: Option[Double],
    f17: List[String],
    f18: String) extends org.apache.avro.specific.SpecificRecordBase {
  def get(f: Int) : AnyRef = ???
  def put(f: Int, value: Any) : Unit = ???
  def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
  val SCHEMA$: org.apache.avro.Schema = ???
}

case class B(
    f1: Long
    f2: Long
    f3: String
    f4: String) extends org.apache.avro.specific.SpecificRecordBase {
  def get(field$ : Int) : AnyRef = ???
  def getSchema() : org.apache.avro.Schema = B.SCHEMA$
  def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
  val SCHEMA$ : org.apache.avro.Schema = ???
}

def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
  val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
  joined.map { case (_, asAndBs) => asAndBs }
}

Do you have any idea what might be going on or how I could get the better performance that should be available from Kryo?

Summary answered 9/1, 2017 at 17:5 Comment(8)
Could you post example case class and job? It would be much easier to answer the question thenLoading
Good point, @T.Gawęd. Updated with simplified code.Summary
How did you measure your code?Chrisman
@YuvalItzchakov I measured performance based on number of records processed per unit of time. I ensured the same number of workers were used. I conducted quite a few trials. The trend was clear. I measured bytes shuffled by reading the value from the Spark UI for the stages which produce the input to the cogroup.Summary
Can you make sure you registered everything used by setting sparkConf.set("spark.kryo.registrationRequired", "true") ?Howdah
@Howdah Yup. I've done that. That's in the block of code toward the top of my question. I got fatal exceptions in my program until I solved that. Thanks for the suggestion.Summary
Do you mind updating your question with Spark version and mode (local, yarn, etc)? Also, are you committed to the avro format? If you use a nested data structure like json, you can circumvent unnecessary shuffle operations.Penstemon
@PaulBack It is Spark 1.5.1 running on YARN. It's joining a couple data sources with a few billion records each comprising multiple TB of data, so I suspect shuffling is unavoidable regardless of data format. I'd love to hear how you'd avoid the shuffle in that case; it would save an enormous amount of server time if we could.Summary
O
3

If your single record size is too small and having huge number of records might make your job slow.Try to increase the buffer size and see whether it makes any improvement.

Try the below one if not done already..

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  // Now it's 24 Mb of buffer by default instead of 0.064 Mb
  .set("spark.kryoserializer.buffer.mb","24") 

Ref:https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/

Officialdom answered 24/5, 2017 at 6:20 Comment(1)
What if I have a record that is 1 gb in size?Antineutrino
P
0

Since you have high cardinality RDDs, broadcasting/broadcast hash joining would seem to be off limits unfortunately.

Your best best is to coalesce() your RDDs prior to joining. Are you seeing high skew in your shuffle times? If so, you may want to coalesce with shuffle=true.

Lastly, if you have RDDs of nested structures (e.g. JSON), that will sometimes allow you to bypass shuffles. Check out the slides and/or video here for a more detailed explanation.

Penstemon answered 30/4, 2017 at 16:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.