为什么sortBy转换会触发Spark作业?

11
根据Spark文档,只有RDD操作才能触发Spark作业,而转换是惰性评估的,只有在对其调用操作时才会执行。
我发现sortBy转换函数会立即应用并显示为SparkUI中的作业触发器。为什么?
2个回答

15

sortBy 是通过使用 sortByKey 实现的,而 sortByKey 又依赖于 RangePartitioner(JVM)或分区函数(Python)。当您调用 sortBy / sortByKey 时,分区器(分区函数)会被急切地初始化并对输入 RDD 进行采样以计算分区边界。您看到的作业就是这个过程所对应的。

实际排序只有在您对新创建的 RDD 或其子代执行动作时才会执行。


5
根据Spark文档,只有操作会在Spark中触发作业,转换是在调用操作时惰性地评估的。一般情况下你是正确的,但正如你刚刚经历的那样,有一些例外,其中包括sortBy和zipWithIndex。事实上,这已经在Spark的JIRA中报告并被关闭,解决方案是不修复。请参见SPARK-1021 sortByKey() launches a cluster job when it shouldn't。您可以通过启用DAGScheduler日志记录(以及稍后在Web UI中)来查看作业运行情况。
scala> sc.parallelize(0 to 8).sortBy(identity)
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
DEBUG DAGScheduler: submitStage(ResultStage 1)
DEBUG DAGScheduler: missing: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25)
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4)
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接