在Spark中,阶段如何分成任务?

180

假设以下情况:每次只有一个Spark作业在运行。

我目前了解到的

这是我对Spark中发生的事情的理解:

  1. 创建SparkContext时,每个工作节点启动一个执行器。执行器是单独的进程(JVM),连接回驱动程序。每个执行器都有驱动程序的jar包。退出驱动程序会关闭执行器。每个执行器可以持有一些分区。
  2. 当执行作业时,根据血统图创建执行计划。
  3. 执行作业被拆分成阶段,其中每个阶段包含尽可能多的相邻(在血统图中)转换和操作,但没有洗牌。因此阶段由洗牌分隔。

image 1

我了解到

  • 任务是通过序列化函数对象从驱动程序发送到执行器的命令。
  • 执行器使用驱动程序的jar包对命令(任务)进行反序列化并在某个分区上执行它。

但是

问题

如何将阶段拆分为任务?

具体来说:

  1. 任务是由转换和操作确定的,还是可以在一个任务中有多个转换/操作?
  2. 任务是由分区确定的(例如,每个阶段每个分区一个任务)。
  3. 任务是由节点确定的(例如,每个阶段每个节点一个任务)。

我认为的(仅部分答案,即使正确也一样)

https://0x0fff.com/spark-architecture-shuffle中,使用图像解释了洗牌。

在此输入图像描述

我的印象是每个阶段分成 #number-of-partitions 个任务,不考虑节点数量。

每个阶段分成 #number-of-partitions 个任务,不考虑节点数量。

对于第一张图片,我会有3个映射任务和3个归约任务。

对于来自0x0fff的图片,假设只有三个橙色和三个深绿色文件,则有8个映射任务和3个归约任务。

未解决问题

那是正确的吗?即使正确,我的问题还没有全部得到解答,因为仍然无法确定多个操作(例如多个映射)是否在一个任务中还是分成一个任务。

他人的说法

Spark中的任务是什么?Spark worker如何执行jar文件?Apache Spark调度程序如何将文件拆分为任务? 与此类似,但我觉得其中并没有清晰地回答我的问题。


如果您能提供更多的见解,我会感激不尽。我有类似的问题。 - Nag
@Nag:我的问题也是希望能获得更多的见解,这就是我问的原因 :-)。这些答案提供了你需要的东西吗?你想要什么样的见解? - Make42
@Nag:嗯,我已经有几年没用Spark了,所以如果我想知道它的工作原理,我必须再次阅读Spark的相关信息(我忘记了大部分细节),而且我的文章可能已经过时了,尤其是因为我的帖子主要参考了Spark 1.x,而且Spark 2.x已经发生了很多变化,就我所记得的而言。但也许这些变化并不涉及后端架构,这也可能是真的。 - Make42
@Nag:我希望我没有给你留下我会深入研究架构的印象...不幸的是,这已经不是我的领域了,但我希望我所写的已经有所帮助。 - Make42
当然不是,非常感谢您的检查!! - Nag
显示剩余2条评论
3个回答

65

你的大纲写得很好。回答你的问题:

  • 对于每个分区的数据,每个 stage 都需要启动一个单独的 task。考虑到每个分区可能驻留在不同的物理位置 - 例如 HDFS 中的块或本地文件系统的目录/卷。

请注意,Stage 的提交由 DAG Scheduler 驱动。这意味着不相互依赖的阶段可以并行提交到集群进行执行,从而最大限度地增加集群的并行化能力。因此,如果我们的数据流操作可以同时进行,我们将会看到多个阶段被启动。

我们可以在以下玩具示例中看到其效果,在该示例中我们执行以下类型的操作:

  • 加载两个数据源
  • 在两个数据源上分别执行一些映射操作
  • 连接它们
  • 在结果上执行一些映射和过滤操作
  • 保存结果

那么我们最终会有多少个阶段?

  • 并行加载两个数据源各自产生 1 个阶段 = 总共 2 个阶段
  • 表示与其他两个阶段有依赖关系的第三个阶段join
  • 注意:所有在连接后的数据上运行的操作都可以在同一个阶段中执行,因为它们必须按顺序进行。启动其他阶段没有好处,因为它们不能在前一个操作完成之前开始工作。

下面是该玩具程序:

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

这里是结果的有向无环图(DAG)

图片描述

现在问题是:有多少个任务?任务数量应该等于

每个阶段(Stage)中的分区(partition)数之和乘以阶段中的任务数(Task)


2
谢谢!请详细说明您对我的文本的回答:1)我的阶段定义是否不全面?听起来我错过了一个要求,即一个阶段不能包含可以并行执行的操作。或者我的描述已经严格意味着这一点了吗?2)作业需要执行的任务数量由分区数确定,而不是处理器或节点数,而可以同时执行的任务数量取决于处理器的数量,对吗?3)一个任务可以包含多个操作? - Make42
1
你的最后一句话是什么意思?毕竟,数字分区可能因阶段而异。你的意思是这就是你为所有阶段配置工作的方式吗? - Make42
哇,你的回答完全没问题,但不幸的是,最后一句话绝对是一个错误的概念。这并不意味着在一个阶段中分区数等于处理器数量,然而,你可以根据你的机器上的核心数设置RDD的分区数。 - Amin Heydari Alashti
这是一个特殊情况 - 但我同意那会造成误导,所以我将其移除。 - WestCoastProjects
@StephenBoesch每个阶段的分区数量都可以不同吗? - flow2k
显示剩余2条评论

35

以下内容可能有助于您更好地理解不同的组成部分:

  • Stage(阶段):是任务的集合。相同的过程针对数据的不同子集(分区)运行。
  • Task(任务):代表在分布式数据集的分区上执行的工作单元。因此,在每个阶段中,任务数等于分区数,或者如您所说,“每个阶段每个分区一个任务”。
  • 每个执行器在一个YARN容器上运行,并且每个容器驻留在一个节点上。
  • 每个阶段利用多个执行器,每个执行器被分配多个vCore。
  • 每个vCore一次只能执行一个任务。
  • 因此,在任何阶段,多个任务可以并行执行。正在运行的任务数=正在使用的vCore数。

2
这是一篇关于Spark架构非常有用的阅读材料: https://0x0fff.com/spark-architecture/ - pedram bashiri
我不明白你所说的第三点。 据我所知,每个节点可以有多个执行器,因此根据第三点: 每个节点应该只有一个执行器。你能澄清这一点吗? - Rituparno Behera
@RituparnoBehera 每个节点都可以有多个容器,因此可以有多个Spark执行器。请查看此链接。https://docs.cloudera.com/runtime/7.0.2/running-spark-applications/topics/spark-yarn-deployment-modes.html - pedram bashiri
我认为执行器不必仅在YARN容器上运行。 - Surya

16
如果我理解正确,有两件(相关的)事情使你困惑:
1)什么决定了任务的内容?
2)什么决定了要执行的任务数量?
Spark引擎将连续rdds上的简单操作“粘合”在一起,例如:
rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

当rdd3被(惰性地)计算时,Spark将为rdd1的每个分区生成一个任务,并且每个任务将执行过滤器和映射以产生rdd3。任务的数量由分区的数量确定。每个RDD都有一个定义好的分区数。对于从HDFS中读取的源RDD(例如使用sc.textFile(...)),分区的数量是输入格式生成的拆分数。某些RDD操作可能会导致具有不同分区数的RDD:
rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

另一个例子是连接:
rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

大多数更改分区数量的操作都涉及洗牌。例如,当我们执行以下操作时:

rdd2 = rdd1.repartition( 1000 ) 

实际发生的是,rdd1每个分区的任务需要生成一个最终输出,以便下一阶段可以读取,从而使rdd2正好有1000个分区(他们如何做到的?哈希排序)。这一侧的任务有时被称为“Map(侧)任务”。 稍后在rdd2上运行的任务将对一个分区(rdd2!)进行操作,并且必须找出如何读取/组合与该分区相关的映射侧输出。这一侧的任务有时被称为“Reduce(侧)任务”。
这两个问题是相关的:一个阶段中的任务数量是连续rdds共同拥有的分区数,而rdd的分区数可以在阶段之间更改(例如通过为某些洗牌操作指定分区数)。

一旦阶段的执行开始,它的任务可以占用任务槽。并发任务槽的数量为 numExecutors * ExecutorCores。通常情况下,这些槽可以由不同的、非依赖的阶段的任务占用。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接