Spark MLlib - trainImplicit warning

2019-03-24 15:52发布

问题:

I keep seeing these warnings when using trainImplicit:

WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB).
The maximum recommended task size is 100 KB.

And then the task size starts to increase. I tried to call repartition on the input RDD but the warnings are the same.

All these warnings come from ALS iterations, from flatMap and also from aggregate, for instance the origin of the stage where the flatMap is showing these warnings (w/ Spark 1.3.0, but they are also shown in Spark 1.3.1):

org.apache.spark.rdd.RDD.flatMap(RDD.scala:296)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

and from aggregate:

org.apache.spark.rdd.RDD.aggregate(RDD.scala:968)
org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

回答1:

Similar problem was described in Apache Spark mail lists - http://apache-spark-user-list.1001560.n3.nabble.com/Large-Task-Size-td9539.html

I think you can try to play with number of partitions (using repartition() method), depending of how many hosts, RAM, CPUs do you have.

Try also to investigate all steps via Web UI, where you can see number of stages, memory usage on each stage, and data locality.

Or just never mind about this warnings unless everything works correctly and fast.

This notification is hard-coded in Spark (scheduler/TaskSetManager.scala)

      if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
          !emittedTaskSizeWarning) {
        emittedTaskSizeWarning = true
        logWarning(s"Stage ${task.stageId} contains a task of very large size " +
          s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
          s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
      }

.

private[spark] object TaskSetManager {
  // The user will be warned if any stages contain a task that has a serialized size greater than
  // this.
  val TASK_SIZE_TO_WARN_KB = 100
}