Spark SQL:为什么一个查询需要两个作业?

17

实验

我在 Spark 1.6.1 上尝试了以下代码片段。

val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files
soDF.registerTempTable("so")
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/")

物理计划是:

== Physical Plan ==
Sort [cnt#59L ASC], true, 0
+- ConvertToUnsafe
   +- Exchange rangepartitioning(cnt#59L ASC,200), None
      +- ConvertToSafe
         +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L])
            +- TungstenExchange hashpartitioning(dpHour#38,200), None
               +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L])
                  +- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder

针对此查询,我得到了两个作业:Job 9Job 10 enter image description here

对于Job 9DAG为:

enter image description here

对于Job 10DAG为:

enter image description here

观察

  1. 显然,一个查询有两个 jobs
  2. Job 10跳过了Stage-16(在Job 9中标记为Stage-14)。
  3. Stage-15的最后一个RDD[48]Stage-17的最后一个RDD[49]相同。是如何做到的?我看到日志中在执行Stage-15后,将RDD[48]注册为RDD[49]
  4. Stage-17driver-logs中显示,但未在Executors上执行。在driver-logs中显示了任务执行,但当我查看Yarn容器的日志时,没有任何从Stage-17接收到的task证据。

支持这些观察的日志(仅driver-logs,由于后来崩溃导致我失去了executor日志)。可以看到,在Stage-17开始之前,已注册了RDD[49]

16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200)
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200)
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool 
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB)
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB)
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB)
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26)
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes)

问题

  1. 为什么会有两个 Jobs?将一个 DAG 分成两个 jobs 的意图是什么?
  2. Job 10DAG 看起来已经可以完成查询执行了,Job 9 正在做些什么特别的吗?
  3. 为什么 Stage-17 没有被跳过?看起来创建了一些虚拟的 tasks,它们有任何用处吗?
  4. 后来我尝试了另一个相对简单的查询。出乎意料地,它创建了 3 个 Jobs

    sqlContext.sql("select dpHour from so order by dphour").write.parquet("/out2/")


我的观察是RDD的东西更容易理解,大多数文档都是基于它的。相比之下,DF的东西确实更难与Job、App等初始讨论相关联。 - thebluephantom
1个回答

15

当您使用高级的dataframe/dataset API时,执行计划包括作业/阶段分块是由Spark自行确定的。这些依赖于许多因素,例如执行并行度、缓存/持久化数据结构等。在Spark的未来版本中,随着优化器的复杂度增加,您可能会看到更多的查询作业,例如某些数据源被采样以参数化基于成本的执行优化。

例如,我经常看到写操作与涉及洗牌的处理生成不同的作业,但并非总是如此。

总之,如果您使用高级API,除非必须对大量数据进行极其详细的优化,否则很少有必要深入了解特定的分块。作业启动成本与处理/输出相比极低。

另一方面,如果您对Spark内部机制感到好奇,请阅读优化器代码并参与Spark开发者邮件列表。


2
这很奇怪,为什么第二个任务阶段不能在第一个任务中进行? - Thomas Decaux
1
好问题。这可能与中间结果生成有关。重要的问题是:为什么 DAG 映射到阶段和作业的方式很重要? - Sim
1
是的,很难真正理解Spark是如何做到这一点的,它是利用可用资源、数据等的混合。 - Thomas Decaux
我的观察是RDD的东西更容易理解,大多数文档都是基于它的。DF的东西确实更难与Job、App等初始讨论相关联。 - thebluephantom
@thebluephantom RDD计划更容易遵循,因为没有优化:您编写的就是Spark执行的内容。缺点显而易见:没有优化,没有高级SQL(类似)操作,更大的序列化/反序列化开销等。这就是为什么在大多数情况下,使用数据集比RDD更快地执行Spark,即使数据集在底层使用RDD。 - Sim
可以,但是这段话都是关于工作、应用程序等的,与深度优先搜索确实有点不同。谢谢。 - thebluephantom

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接