Spark MLlib - trainImplicit 警告

14

在使用 trainImplicit 时,我一直看到这些警告:

WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB).
The maximum recommended task size is 100 KB.

然后任务大小开始增加。我尝试在输入RDD上调用repartition,但警告仍然相同。

所有这些警告都来自ALS迭代,来自flatMap和aggregate,例如在flatMap显示这些警告的阶段的起源(使用Spark 1.3.0,但它们也会在Spark 1.3.1中显示):

org.apache.spark.rdd.RDD.flatMap(RDD.scala:296)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

而来自聚合:

org.apache.spark.rdd.RDD.aggregate(RDD.scala:968)
org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112)
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538)
org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527)
scala.collection.immutable.Range.foreach(Range.scala:141)
org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527)
org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203)

1
我很惊讶一个现代框架认为208KB是“大”的。不知道背后的理由会是什么... - Paul
1
这是任务的大小,而不是数据的大小。 - Tarantula
很可能您的数据存在偏斜,这会给一个任务带来更多的负载。 - ayan guha
@HiteshDharamdasani 我有类似的问题,我也做了那个。你有什么想法吗? 这是一个约100k个shapefile对象的列表。 - fanfabbb
2
这些问题似乎至少对于隐式反馈训练来说可以安全地忽略。 - Tarantula
显示剩余3条评论
1个回答

1

类似的问题在Apache Spark邮件列表中有描述 - http://apache-spark-user-list.1001560.n3.nabble.com/Large-Task-Size-td9539.html

我认为您可以尝试调整分区数量(使用 repartition() 方法),具体取决于您拥有多少主机、RAM和CPU。

还可以通过Web UI调查所有步骤,其中您可以看到每个阶段的内存使用情况、数据局部性的数量等信息。

或者,除非一切工作得正确且快速,否则不要理会这些警告。

此通知是在Spark中硬编码的(scheduler / TaskSetManager.scala

      if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
          !emittedTaskSizeWarning) {
        emittedTaskSizeWarning = true
        logWarning(s"Stage ${task.stageId} contains a task of very large size " +
          s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
          s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
      }

.

private[spark] object TaskSetManager {
  // The user will be warned if any stages contain a task that has a serialized size greater than
  // this.
  val TASK_SIZE_TO_WARN_KB = 100
} 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接