如何调整Spark执行器数量、核心数和执行器内存?

110

如何开始调整上述参数。我们是从执行器内存和执行器数量开始,还是从核心数开始获取执行器数量。我遵循了 链接 ,但只得到了一个高级别的想法,仍然不确定如何或从何处开始,并达成最终结论。

2个回答

256
以下答案涵盖了标题中提到的三个主要方面,即执行器数量、执行器内存和核心数量。可能还有其他参数,如驱动程序内存和其他参数,本答案未涉及,但希望在不久的将来添加。
情况1硬件-6个节点,每个节点16个核心,64 GB RAM
每个执行器都是一个JVM实例。因此,我们可以在单个节点上拥有多个执行器。
首先需要1个核心和1 GB的操作系统和Hadoop守护程序,因此每个节点可用15个核心,63 GB RAM。
从选择核心数开始:
Number of cores = Concurrent tasks as executor can run 

So we might think, more concurrent tasks for each executor will give better performance. But research shows that
any application with more than 5 concurrent tasks, would lead to bad show. So stick this to 5.

This number came from the ability of executor and not from how many cores a system has. So the number 5 stays same
even if you have double(32) cores in the CPU.

执行者数量:

Coming back to next step, with 5 as cores per executor, and 15 as total available cores in one Node(CPU) - we come to 
3 executors per node.

So with 6 nodes, and 3 executors per node - we get 18 executors. Out of 18 we need 1 executor (java process) for AM in YARN we get 17 executors

This 17 is the number we give to spark using --num-executors while running from spark-submit shell command

每个执行器的内存:
From above step, we have 3 executors  per node. And available RAM is 63 GB

So memory for each executor is 63/3 = 21GB. 

However small overhead memory is also needed to determine the full memory request to YARN for each executor.
Formula for that over head is max(384, .07 * spark.executor.memory)

Calculating that overhead - .07 * 21 (Here 21 is calculated as above 63/3)
                            = 1.47

Since 1.47 GB > 384 MB, the over head is 1.47.
Take the above from each 21 above => 21 - 1.47 ~ 19 GB

So executor memory - 19 GB

最终数字 - 执行器 - 17,核心5,执行器内存 - 19 GB。

案例2 硬件: 相同的6个节点,32个核心,64 GB

5对于良好的并发性是相同的

每个节点的执行器数量= 32/5〜6

因此总执行器数= 6 * 6个节点= 36。然后最终数字为36-1用于AM = 35

执行器内存为:每个节点6个执行器。63/6〜10。开销为0.07 * 10 = 700 MB。 因此将开销四舍五入到1GB,我们得到10-1 = 9 GB

最终数字-执行器-35,核心5,执行器内存-9 GB


案例3

以上情况都是以核心数为固定值,然后移动到执行器数量和内存的情况。

现在对于第一个情况,如果我们认为不需要19 GB,只有10 GB就足够了,那么以下是数字:

核心数 5 每个节点的执行器数量 = 3

在这个阶段,这将导致21,然后根据我们的第一次计算变为19。但是由于我们认为10就可以了(假设有点余地),所以我们不能将每个节点的执行器数量切换为6(如63/10)。因为每个节点有6个执行器和5个核心时,它会降至每个节点30个核心,而我们只有16个核心。因此,我们还需要更改每个执行器的核心数。

因此再次计算,

神奇的数字5变成了3(任何小于或等于5的数字)。因此,使用3个核心和15个可用核心-我们每个节点得到5个执行器。因此(5 * 6-1)= 29个执行器

因此内存为63/5〜12。超额头寸为12 * .07 = .84 因此,执行器内存为12-1 GB = 11 GB

最终数字为29个执行器,3个核心,执行器内存为11 GB。

动态分配:

注意:如果启用了动态分配,则执行器数量的上限。这意味着Spark应用程序可以在需要时占用所有资源。因此,在运行其他应用程序并且它们也需要核心来运行任务的群集中,请确保您在群集级别上执行此操作。我的意思是,您可以根据用户访问为YARN分配特定数量的核心。因此,您可以创建spark_user,然后为该用户提供核心(最小/最大)。这些限制是为Spark和在YARN上运行的其他应用程序之间共享而设置的。

spark.dynamicAllocation.enabled - 当设置为true时 - 我们无需提及执行器。原因如下:

我们在spark-submit中给出的静态参数是整个作业持续时间的参数。但是,如果涉及到动态分配,则会有不同的阶段,例如

从哪里开始:

初始执行器数(spark.dynamicAllocation.initialExecutors

多少个:

根据负载(待处理任务)确定需要请求多少个执行器。这最终将是我们以静态方式在spark-submit中提供的数字。因此,一旦设置了初始执行器数量,我们就会使用最小值(spark.dynamicAllocation.minExecutors)和最大值(spark.dynamicAllocation.maxExecutors)的数字。
何时请求或提供:
何时请求新的执行器(spark.dynamicAllocation.schedulerBacklogTimeout)- 有待处理任务达到此时间。所以请求数量每轮指数级增加,从上一轮开始。例如,应用程序将在第一轮中添加1个执行器,然后在后续轮次中添加2、4、8等执行器。在特定时间点,上述最大值就会出现。
何时放弃一个执行器(spark.dynamicAllocation.executorIdleTimeout)-
如有遗漏,请纠正。以上是我基于问题中分享的博客和一些在线资源所理解的内容。谢谢。
参考资料:

2
我在某处读到独立模式下每个节点只有一个执行程序,你有什么想法吗?我没有看到这方面的涵盖在你的答案中。 - jangorecki
2
在独立集群中,默认情况下,每个工作节点会有一个执行器。我们需要调整spark.executor.cores参数,以便让一个工作节点拥有足够的核心数来运行多个执行器。 - Ramzy
是的,核心的默认值是无限的,就像他们所说的那样。因此,除非您指定,否则Spark可以使用所有可用的核心。 - Ramzy
2
@Ramzy 我认为需要注意的是,即使使用动态分配,您仍应指定spark.executor.cores来确定Spark将要分配的每个执行器的大小。 否则,每当Spark要向您的应用程序分配新的执行器时,它都会分配整个节点(如果可用),即使您只需要五个核心。 - Dan Markhasin
如何调整驱动程序内存和驱动程序核心? - Anchika Agarwal
显示剩余9条评论

6
此外,这也取决于您的使用情况,一个重要的配置参数是:spark.memory.fraction(用于执行和存储的堆空间-300MB的分数)来自http://spark.apache.org/docs/latest/configuration.html#memory-management。如果您不使用缓存/持久化,请将其设置为0.1,以便您的程序占用所有内存。如果您使用缓存/持久化,可以检查所占用的内存。
sc.getExecutorMemoryStatus.map(a => (a._2._1 - a._2._2)/(1024.0*1024*1024)).sum

你是从HDFS还是HTTP读取数据?

再次强调,调优取决于你的使用情况。


你知道在使用 PySpark 时,map 命令会是什么样子吗?我使用 sc._jsc.sc().getExecutorMemoryStatus() 获取执行器状态,但无法对其返回的内容进行任何操作... - Thomas
1
抱歉 @Thomas Decaux,但您是不是想设置 spark.memory.storageFraction=0.1?因为据我所知,spark.memory.fraction 决定了 Spark 用于 执行和存储 的内存量(您已经提到了这一点),只有可用于存储和执行的内存的 spark.memory.storageFraction 是免受 缓存驱逐 的影响。请参见此链接 - y2k-shubham
@Thomas 如果我的应用程序中只有persist(StorageLevel.DISK_ONLY),那么这个选项也适用,对吧?它只影响内存分数,但不会影响任何磁盘溢出吗? - jk1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接