Spark MLlib - trainImplicit warning
Asked Answered
S

1

14

I keep seeing these warnings when using trainImplicit:

WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB).
The maximum recommended task size is 100 KB.

And then the task size starts to increase. I tried to call repartition on the input RDD but the warnings are the same.

All these warnings come from ALS iterations, from flatMap and also from aggregate, for instance the origin of the stage where the flatMap is showing these warnings (w/ Spark 1.3.0, but they are also shown in Spark 1.3.1):

org.apache.spark.rdd.RDD.flatMap(RDD.scala:296)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

and from aggregate:

org.apache.spark.rdd.RDD.aggregate(RDD.scala:968)
org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)
Starnes answered 22/4, 2015 at 17:27 Comment(8)
can you provide data and code example?Nervine
I'm surprised a modern framework thinks 208KB is "large". Wondering what the rationale is going to be on that...Bicipital
This is the size of the task and not of the data.Starnes
Most likely you have skewed data and it is putting more load on one taskZellner
Just out of curiosity. Do you happen to have a static list converted to an RDD at some point. Some list larger than 100 KB?Leu
@HiteshDharamdasani I have similar problems and I happen to do that. Do you have an idea? It's a list of ~100k shapefile objects..Glyceride
It seems that these problems, at least for the implicit feedback training, can be safely ignored.Starnes
@ipoteka please see a code sample with a similar issue here.Edbert
C
1

Similar problem was described in Apache Spark mail lists - http://apache-spark-user-list.1001560.n3.nabble.com/Large-Task-Size-td9539.html

I think you can try to play with number of partitions (using repartition() method), depending of how many hosts, RAM, CPUs do you have.

Try also to investigate all steps via Web UI, where you can see number of stages, memory usage on each stage, and data locality.

Or just never mind about this warnings unless everything works correctly and fast.

This notification is hard-coded in Spark (scheduler/TaskSetManager.scala)

      if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
          !emittedTaskSizeWarning) {
        emittedTaskSizeWarning = true
        logWarning(s"Stage ${task.stageId} contains a task of very large size " +
          s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
          s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
      }

.

private[spark] object TaskSetManager {
  // The user will be warned if any stages contain a task that has a serialized size greater than
  // this.
  val TASK_SIZE_TO_WARN_KB = 100
} 
Chapman answered 15/6, 2015 at 12:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.