Scope of 'spark.driver.maxResultSize'
Asked Answered
M

1

9

I'm running a Spark job to aggregate data. I have a custom data structure called a Profile, which basically contains a mutable.HashMap[Zone, Double]. I want to merge all profiles that share a given key (a UUID), with the following code:

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
  .aggregateByKey(new Profile(), 3200)(merge, merge).cache()

Curiously, Spark fails with the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

The obvious solution is to increment "spark.driver.maxResultSize", but two things puzzle me.

  1. Too much of a coincidence that I get 1024.0 greater than 1024.0
  2. All the documentation and help I found googling this particular error and configuration parameter indicates that it affect functions that take a value back to the driver. (say take() or collect()), but I'm not taking ANYTHING to the driver, just reading from HDFS, aggregating, saving back to HDFS.

Does anyone know why I'm getting this error?

Monohydric answered 11/9, 2015 at 18:47 Comment(1)
I'll upvote it, but sadly I no longer have access to that code (or company), nor does your answer solve piece #2, which is that the operation should not be happening on the first place :-SMonohydric
D
1

Yes, It's failing because The values we see in exception message are rounded off by one precision and comparison happening in bytes.

That serialized output must be more than 1024.0 MB and less than 1024.1 MB.

Check added Apache Spark code snippet, It's very interesting and very rare to get this error. :)

Here totalResultSize > maxResultSize both are Long types and in holds the value in bytes. But msg holds rounded value from Utils.bytesToString().

//TaskSetManager.scala
  def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
    totalResultSize += size
    calculatedTasks += 1
    if (maxResultSize > 0 && totalResultSize > maxResultSize) {
      val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
        s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " +
        s"(${Utils.bytesToString(maxResultSize)})"
      logError(msg)
      abort(msg)
      false
    } else {
      true
    }
  }

Apache Spark 1.3 - source


//Utils.scala
  def bytesToString(size: Long): String = {
    val TB = 1L << 40
    val GB = 1L << 30
    val MB = 1L << 20
    val KB = 1L << 10

    val (value, unit) = {
      if (size >= 2*TB) {
        (size.asInstanceOf[Double] / TB, "TB")
      } else if (size >= 2*GB) {
        (size.asInstanceOf[Double] / GB, "GB")
      } else if (size >= 2*MB) {
        (size.asInstanceOf[Double] / MB, "MB")
      } else if (size >= 2*KB) {
        (size.asInstanceOf[Double] / KB, "KB")
      } else {
        (size.asInstanceOf[Double], "B")
      }
    }
    "%.1f %s".formatLocal(Locale.US, value, unit)
  }

Apache Spark 1.3 - source

Discard answered 12/11, 2016 at 16:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.