Spark:RDD的最佳分区数和元素数量是否有经验法则?

8

RDD中包含的元素数量和其理想分区数之间是否有关系?

我有一个RDD,它有成千上万个分区(因为我从由多个小文件组成的源文件中加载它,这是一个我无法解决的限制,所以我必须处理它)。我想重新分区它(或使用coalesce方法)。但我不知道RDD将包含的确切事件数量。
因此,我希望以自动化方式完成这项工作。类似于:

val numberOfElements = rdd.count()
val magicNumber = 100000
rdd.coalesce( numberOfElements / magicNumber)

有没有什么经验法则可以确定RDD的最佳分区数和其元素数量?

谢谢。

2个回答

8
没有确定的答案,因为它高度依赖于应用程序、资源和数据。有一些硬性限制(例如各种2GB限制),但其余的必须根据任务进行调整。需要考虑以下几个因素:
  • 单行/元素的大小
  • 典型操作的成本。如果有小分区并且操作很便宜,则调度成本可能比数据处理成本高得多。
  • 执行基于分区的(例如排序)操作时,处理分区的成本。

如果核心问题是初始文件的数量,则使用某种变体的CombineFileInputFormat可能比重新分区/合并更好。例如:

sc.hadoopFile(
  path,
  classOf[CombineTextInputFormat],
  classOf[LongWritable], classOf[Text]
).map(_._2.toString)

另请参阅如何计算最佳的coalesce分区数量?


2
虽然我完全同意zero323的观点,但是你仍然可以实现某种启发式算法。我们内部采用了存储为avro键值对的数据大小和压缩后计算分区数,以便每个分区不超过64MB(totalVolume/64MB~分区数)。偶尔我们会运行自动作业来重新计算每种输入类型的“最优”分区数等。在我们的情况下,这很容易做到,因为输入来自hdfs(s3可能也可以)。再次强调,这取决于您的计算和数据,因此您的数字可能完全不同。

@zero323 @Igor Berman,请问调整Spark性能时应该根据记录数还是字节数来权衡partition的大小?我的Spark作业从MySQL中以并行方式读取数据失败了(详情请见https://dev59.com/Janka4cB1Zd3GeqPQpu5),我怀疑这可能是由于`partition`的大小。[这个链接](https://www.slideshare.net/hadooparchbook/top-5-mistakes-when-writing-spark-applications-66374492/39) 表示,partition的大小应该是** ~ 128 MB (没有提及行数),但我的partition大小可以达到 ~ 10 GB** ,包含约** ~ 15 M** 条记录(如果读取成功的话)。 - y2k-shubham
1
@y2k-shubham 这要看情况。你可以应用其中的任意一种。我在某些项目中见过按计数,而在另一个项目中则是按字节计算。1个分区的10GB太大了...关于最佳大小(64MB、128MB左右 - 你需要测试,无论如何,在我看来它都低于1GB)。 - Igor Berman
@Igor Berman 我知道 ~ 10GB 对于分区来说是太大了,但这是因为我的 DataFrame 是如何创建的。我正在使用 Spark Jdbc 从 MySQL 中读取表。根据我的 MySQL 实例大小,我只能将读操作并行化到 ~40个连接(numPartitions = 40)。因此,创建的 DataFrame 的某些分区最终会变得非常大。我可以在创建后重新分区到更小的大小,但它们仍将保持这么大。由于我无法控制 MySQL,所以无法想出解决方法。 - y2k-shubham
1
@y2k-shubham 你熟悉 https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#manage-parallelism 吗?我没有太多使用jdbc的经验,所以无法给你任何建议。如果您创建了1000个分区,但并行度为40会发生什么?(您将拥有相对较小的分区,但由于并行度为40(或最大核心),因此不会超过连接使用)。这不是完美的解决方案,但mysql不适用于大数据处理。您可以通过两个步骤完成,使用并行度40选择并将其存储到hdfs / s3中,然后使用正常并行度运行。 - Igor Berman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接