Error:
ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)
Goal: Obtain recommendation for all the users using the model and overlap with each users test data and generate overlap ratio.
I have build a recommendation model using spark mllib. I evaluate the overlap ration of test data per user and recommended items per user and generate mean overlap ratio.
def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey
val n = testData.count
val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20)
.mapValues(_.map(r => r.product))
val overlaps = testData.join(recommendations).map(x => {
val moviesPerUserInRecs = x._2._2.toSet
val moviesPerUserInTest = x._2._1.toSet
val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest)
if(localHitRatio.size > 0)
1
else
0
}).filter(x => x != 0).count
var r = 0.0
if (overlaps != 0)
r = overlaps / n
return r
}
But the problem here is that it ends up throwing above maxResultSize
error. In my spark configuration I did following to increase the maxResultSize
.
val conf = new SparkConf()
conf.set("spark.driver.maxResultSize", "6g")
But that didn't solve the problem, I went almost close to the amount that I allocate the driver memory yet the issue didn't get resolve. While the code is getting execute I kept eyes on my spark job and what I saw is bit puzzling.
[Stage 281:==> (47807 + 100) / 1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)
At above stage code is executing MatrixFactorization code in spark-mllib recommendForAll
around line 277
(not exactly sure the line number).
private def recommendForAll(
rank: Int,
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}
recommendForAll
method get called in from recommendProductsForUsers
method.
But looks like the method is spinning off 1M tasks. Data that get fed comes from 2000 part files so I am confuse how it started to spit 1M tasks and I think that might be the problem.
My question is how can I actually resolve this problem. Without using this approach its really hard to calculate overlap ratio
or recall@K
. This is on spark 1.5 (cloudera 5.5)