SparkError: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)
Asked Answered
S

1

7

Error:

ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)

Goal: Obtain recommendation for all the users using the model and overlap with each users test data and generate overlap ratio.

I have build a recommendation model using spark mllib. I evaluate the overlap ration of test data per user and recommended items per user and generate mean overlap ratio.

  def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {

    val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey
    val n = testData.count

    val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20)
      .mapValues(_.map(r => r.product))

    val overlaps = testData.join(recommendations).map(x => {
      val moviesPerUserInRecs = x._2._2.toSet
      val moviesPerUserInTest = x._2._1.toSet
      val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest)
      if(localHitRatio.size > 0)
        1
      else
        0
    }).filter(x => x != 0).count

    var r = 0.0
    if (overlaps != 0)
      r = overlaps / n

    return r

  }

But the problem here is that it ends up throwing above maxResultSize error. In my spark configuration I did following to increase the maxResultSize.

val conf = new SparkConf()
conf.set("spark.driver.maxResultSize", "6g")

But that didn't solve the problem, I went almost close to the amount that I allocate the driver memory yet the issue didn't get resolve. While the code is getting execute I kept eyes on my spark job and what I saw is bit puzzling.

[Stage 281:==>   (47807 + 100) / 1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)

At above stage code is executing MatrixFactorization code in spark-mllib recommendForAll around line 277 (not exactly sure the line number).

  private def recommendForAll(
      rank: Int,
      srcFeatures: RDD[(Int, Array[Double])],
      dstFeatures: RDD[(Int, Array[Double])],
      num: Int): RDD[(Int, Array[(Int, Double)])] = {
    val srcBlocks = blockify(rank, srcFeatures)
    val dstBlocks = blockify(rank, dstFeatures)
    val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
      case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
        val m = srcIds.length
        val n = dstIds.length
        val ratings = srcFactors.transpose.multiply(dstFactors)
        val output = new Array[(Int, (Int, Double))](m * n)
        var k = 0
        ratings.foreachActive { (i, j, r) =>
          output(k) = (srcIds(i), (dstIds(j), r))
          k += 1
        }
        output.toSeq
    }
    ratings.topByKey(num)(Ordering.by(_._2))
  }

recommendForAll method get called in from recommendProductsForUsers method.

But looks like the method is spinning off 1M tasks. Data that get fed comes from 2000 part files so I am confuse how it started to spit 1M tasks and I think that might be the problem.

My question is how can I actually resolve this problem. Without using this approach its really hard to calculate overlap ratio or recall@K. This is on spark 1.5 (cloudera 5.5)

Snappish answered 2/12, 2015 at 5:25 Comment(0)
B
0

the 2GB problem is not new to the Spark community: https://issues.apache.org/jira/browse/SPARK-6235

Re/ the partition size greater than 2GB, try to repartition (myRdd.repartition(parallelism)) your RDD to a greater number of partitions (w/r/t/ your current level of parallelism), thus reducing each single partition's size.

Re/ the number of tasks spinned (hence partitions created), my hypothesis is that it might come out of the srcBlocks.cartesian(dstBlocks) API call, which produces an output RDD made of (z = srcBlocks's number of partitions * dstBlocks's number of partitions) partitions.

In this case, you might consider leveraging myRdd.coalesce(parallelism) API instead of the repartition one to avoid shuffle (and partitions seralialization related problems).

Boxhaul answered 12/9, 2017 at 11:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.