Spark:编程方式获取集群核心数

16

我在yarn集群中运行我的Spark应用程序。在我的代码中,我使用队列可用核心的数量来为我的数据集创建分区:

Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
我的问题是:如何通过编程方式获取队列可用核心数量,而不是通过配置方式?

你使用哪个资源管理器?YARN还是Mesos? - Rahul Sharma
我正在使用yarn。 - Rougher
yarn集群API中提取所需的队列参数,然后在coalesce中使用。 - Rahul Sharma
5个回答

20

从Spark中可以获取执行器数量和集群中核心数的方法。以下是我过去使用过的一些Scala实用程序代码。您应该很容易将其适应于Java。有两个关键想法:

  1. 工作进程数是执行器数减一或sc.getExecutorStorageStatus.length - 1

  2. 每个工作进程的核心数可以通过在工作进程上执行java.lang.Runtime.getRuntime.availableProcessors来获得。

其余的代码是为了使用Scala隐式添加便捷方法到SparkContext,这是模板代码。我几年前为1.x编写了这段代码,这就是为什么它没有使用SparkSession的原因。

最后一个要点:通常最好将分区数合并为您的核心数的倍数,因为这可以改善数据倾斜的情况下的性能。在实践中,我使用1.5倍到4倍之间的任何值,具体取决于数据大小和作业是否在共享集群上运行。

import org.apache.spark.SparkContext

import scala.language.implicitConversions


class RichSparkContext(val sc: SparkContext) {

  def executorCount: Int =
    sc.getExecutorStorageStatus.length - 1 // one is the driver

  def coresPerExecutor: Int =
    RichSparkContext.coresPerExecutor(sc)

  def coreCount: Int =
    executorCount * coresPerExecutor

  def coreCount(coresPerExecutor: Int): Int =
    executorCount * coresPerExecutor

}


object RichSparkContext {

  trait Enrichment {
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
      new RichSparkContext(sc)
  }

  object implicits extends Enrichment

  private var _coresPerExecutor: Int = 0

  def coresPerExecutor(sc: SparkContext): Int =
    synchronized {
      if (_coresPerExecutor == 0)
        sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
      else _coresPerExecutor
    }

}

更新

最近,getExecutorStorageStatus已被删除。我们改用SparkEnvblockManager.master.getStorageStatus.length - 1(减去1是为了再次排除driver)。通常通过SparkContextenv来访问它,但在org.apache.spark包之外无法访问。因此,我们使用了一种封装违规模式:

package org.apache.spark

object EncapsulationViolator {
  def sparkEnv(sc: SparkContext): SparkEnv = sc.env
}

1
sc.getExecutorStorageStatus.length - 1 对我来说很好。谢谢。 - Rougher
2
有时执行器核心被过度配置或欠配置,这意味着 JVM 运行时函数可能不准确。 - tribbloid
1
@tribbloid 绝对正确,也适用于各种集群管理系统中的复杂动态池配置。这适用于常见/简单情况,并需要针对复杂情况进行调整。 - Sim
3
FYI,自Spark 2.4.4起,getExecutorStorageStatus方法已不再可用。 - Denis Makarenko
1
@Sim 正确。 对我来说,调试可能是更好的词,因为有时候单线程进行调试是很有帮助的。 - Andy Kershaw
显示剩余7条评论

8

在寻找答案时发现了这篇文章,问题和我遇到的差不多。

我发现:

Dataset ds = ...
ds.coalesce(sc.defaultParallelism());

这正是OP所寻求的功能。

例如,我的5个节点 x 8核心集群会将defaultParallelism设置为40。


1
根据Databricks的说法,如果驱动程序和执行程序是相同的节点类型,则可以采用以下方式:
java.lang.Runtime.getRuntime.availableProcessors * (sc.statusTracker.getExecutorInfos.length -1)

java.lang.Runtime.getRuntime.availableProcessors 可以告诉您当前机器上有多少个 CPU。但不能假设所有集群中的机器都是如此。 - James Moore
@JamesMoore 您是正确的。这仅适用于驱动程序和工作节点具有相同节点类型的情况。 - zaxme
@zaxme 我运行了这个程序,只得到了 java.lang.Runtime.getRuntime.availableProcessors 的值,而对于 sc.statusTracker.getExecutorInfos.length 得到了 0。请问有适当的时间请求吗?我正在使用 EMR 集群,而不是 databricks。谢谢! - capt-mac
@capt-mac 我没有使用过EMR,但是也许这个链接对你有用 - https://stackoverflow.com/a/46513788/1450817 - zaxme
@zaxme 谢谢,这看起来像是我需要手动输入信息,建议的线程似乎提供了处理器数量,这是我目前可以获得的。我需要一种方法让执行者提供一个值(sc.statusTracker.getExecutorInfos.length 返回 0),这将允许我以编程方式告诉我的作业应该给出多少个分区,而无需为其设置配置。 - capt-mac

1
你可以在每台机器上运行作业并询问它的内核数,但这不一定是Spark可用的(正如@tribbloid在另一个答案的评论中指出的那样):
import spark.implicits._
import scala.collection.JavaConverters._
import sys.process._
val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
val nCpus = procs.values.sum

在一个只有两个工作节点的小型测试集群上,在shell中运行它会得到以下结果:
scala> :paste
// Entering paste mode (ctrl-D to finish)

    import spark.implicits._
    import scala.collection.JavaConverters._
    import sys.process._
    val procs = (1 to 1000).toDF.map(_ => "hostname".!!.trim -> java.lang.Runtime.getRuntime.availableProcessors).collectAsList().asScala.toMap
    val nCpus = procs.values.sum

// Exiting paste mode, now interpreting.

import spark.implicits._                                                        
import scala.collection.JavaConverters._
import sys.process._
procs: scala.collection.immutable.Map[String,Int] = Map(ip-172-31-76-201.ec2.internal -> 2, ip-172-31-74-242.ec2.internal -> 2)
nCpus: Int = 4

如果您的集群通常有大量机器,请将零添加到您的范围中。即使在我的两台机器集群上,10000个任务也可以在几秒钟内完成。

如果您需要比sc.defaultParallelism()提供的更多信息(例如@SteveC的答案),则可能只有这才有用。


0

对于那些没有使用yarn集群的人:如果你在Python/Databricks中进行操作,这里有一个我编写的函数可以帮助解决这个问题。它将为您获取工作节点数量和CPU数量,并返回您的工作分配的最终CPU计数的乘积。

def GetDistCPUCount():
    nWorkers = int(spark.sparkContext.getConf().get('spark.databricks.clusterUsageTags.clusterTargetWorkers'))
    GetType = spark.sparkContext.getConf().get('spark.databricks.clusterUsageTags.clusterNodeType')
    GetSubString = pd.Series(GetType).str.split(pat = '_', expand = True)
    GetNumber = GetSubString[1].str.extract('(\d+)')
    ParseOutString = GetNumber.iloc[0,0]
    WorkerCPUs = int(ParseOutString)
    nCPUs = nWorkers * WorkerCPUs
    return nCPUs

第四行的“test”是什么?由于这个原因它正在抛出错误。 - Sarath Subramanian

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接