Spark SQL:为什么在Spark UI中看到了三个作业而不是一个单一的作业?

5
根据我的理解,Spark中每个action都会有一个作业。
但是我经常看到一个单独的操作触发了多个作业。
我试图通过对数据集进行简单的聚合来测试这一点,以获取每个类别(这里是“subject”字段)的最大值。
在检查Spark UI时,我可以看到有3个“jobs”执行groupBy操作,而我原本只期望有一个。
有人能帮助我理解为什么不是1而是3吗?
   students.show(5)

    +----------+--------------+----------+----+-------+-----+-----+
    |student_id|exam_center_id|   subject|year|quarter|score|grade|
    +----------+--------------+----------+----+-------+-----+-----+
    |         1|             1|      Math|2005|      1|   41|    D|
    |         1|             1|   Spanish|2005|      1|   51|    C|
    |         1|             1|    German|2005|      1|   39|    D|
    |         1|             1|   Physics|2005|      1|   35|    D|
    |         1|             1|   Biology|2005|      1|   53|    C|
    |         1|             1|Philosophy|2005|      1|   73|    B|
    

  // Task : Find Highest Score in each subject
  val highestScores = students.groupBy("subject").max("score")
  highestScores.show(10)

+----------+----------+
|   subject|max(score)|
+----------+----------+
|   Spanish|        98|
|Modern Art|        98|
|    French|        98|
|   Physics|        98|
| Geography|        98|
|   History|        98|
|   English|        98|
|  Classics|        98|
|      Math|        98|
|Philosophy|        98|
+----------+----------+
only showing top 10 rows

在检查Spark用户界面时,我发现针对groupBy操作执行了3个“作业”,而我原本期望只有一个。 enter image description here enter image description here 有谁能帮助我理解为什么会出现3个而不是1个?
== Physical Plan ==
*(2) HashAggregate(keys=[subject#12], functions=[max(score#15)])
+- Exchange hashpartitioning(subject#12, 1)
   +- *(1) HashAggregate(keys=[subject#12], functions=[partial_max(score#15)])
      +- *(1) FileScan csv [subject#12,score#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/lab/SparkLab/files/exams/students.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<subject:string,score:int>

@mazaneicha,没错,但是highestScores.show(10)会触发3个作业,请查看UI界面。 - Remis Haroon - رامز
2个回答

2

我认为只有#3才执行实际的“工作”(如果您在SQL选项卡上打开查询的详细信息,将会看到执行计划)。其他两个步骤是准备工作--

  • #1正在查询NameNode以构建InMemoryFileIndex以读取您的csv文件,
  • #2正在对数据集进行抽样以执行.groupBy("subject").max("score"),这在内部需要sortByKey (这里有更多细节)。

谢谢你的回答,它帮助我更好地理解了Spark的工作原理。 - Remis Haroon - رامز

0

我建议检查物理计划 -

highestScores.explain()

你可能会看到类似这样的内容-

*(2) HashAggregate(keys=[subject#9], functions=[max(score#12)], output=[subject#9, max(score)#51])
+- Exchange hashpartitioning(subject#9, 2)
   +- *(1) HashAggregate(keys=[subject#9], functions=[partial_max(score#12)], output=[subject#9, max#61])
  1. [映射阶段] 阶段#1 是实现本地聚合(部分聚合),然后使用 hashpartitioning(subject) 进行洗牌。请注意,hashpartitioner 使用 group by 列。
  2. [归约阶段] 阶段#2 是将阶段#1 的输出合并以获得最终的 max(score)
  3. 这实际上用于打印前10条记录 show(10)

谢谢您的回答,这种行为有文档记录吗?我在哪里可以了解更多相关信息? - Remis Haroon - رامز
还可以参考以下链接:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan-HashAggregateExec.html - Som
1
其实不确定那是否相关。一般来说,执行计划中显示的代码生成阶段和提交的作业不必匹配。 - mazaneicha
你的意思是这两个不同吗?我认为,我们将会在执行spark应用程序时为每个操作看到一个作业,对吗?上述的执行计划只会显示涉及计算“groupBy...max...”的阶段。 - Som

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接