Spark - repartition()和coalesce()的区别

417
根据《学习Spark》,请记住重新分区数据是一项相当昂贵的操作。Spark还有一个经过优化的repartition()版本,称为coalesce(),它允许避免数据移动,但前提是您减少了RDD分区的数量。
我得到的一个区别是,使用repartition() 可以增加/减少分区的数量,而使用coalesce()只能减少分区的数量。
如果分区分布在多台计算机上,并且运行coalesce(),那么它如何避免数据移动?
20个回答

511

它避免了完全洗牌。如果知道数字是递减的,那么执行器可以安全地保留数据在最小数量的分区上,只将数据从额外的节点移动到我们保留的节点上。

因此,它会像这样进行:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

然后将数据 coalesce 到2个分区:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

请注意,节点1和节点3不需要其原始数据即可移动。


160
谢谢您的回复。文档应该更明确地说“最小化数据移动”而不是“避免数据移动”。 - Praveen Sripati
17
在使用coalesce之外,是否有任何情况下需要使用repartition - TheMP
32
@Niemand,我认为当前的文档已经很清楚地涵盖了这个问题:https://github.com/apache/spark/blob/128c29035b4e7383cc3a9a6c7a9ab6136205ac6c/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L376 请记住,所有 repartition 做的就是使用 shuffle 参数设置为 true 来调用 coalesce。如果有帮助,请告诉我。 - Justin Pihony
3
能否减少已经存在的分区文件的数量?我没有HDFS,但是有很多文件的问题。 - user6023611
3
重新分区会统计上变慢,因为它不知道正在缩小...但也许他们可以进行优化。在内部,它只是带有“shuffle = true”标志调用coalesce函数。 - Justin Pihony
显示剩余9条评论

314

Justin的回答很棒,这个回复更深入地解释了一下。

repartition算法会进行完整的洗牌(shuffle),并创建具有均匀数据分布的新分区。让我们创建一个包含1到12的数字的DataFrame。

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf 在我的机器上有4个分区。

numbersDf.rdd.partitions.size // => 4

以下是数据在分区上的划分方式:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

我们使用repartition方法进行完全洗牌,并将此数据分配到两个节点上。

val numbersDfR = numbersDf.repartition(2)

这是我机器上对numbersDfR数据进行分区的方式:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

repartition 方法可以创建新的分区,并将数据均匀地分配到这些新的分区中(对于较大的数据集,数据分布更加均匀)。

coalescerepartition 的区别

coalesce 使用现有的分区来最小化需要洗牌的数据量,而 repartition 则创建新分区并执行完整的洗牌操作。结果是,coalesce 生成的分区具有不同数量的数据(有时是非常不同的大小),而 repartition 生成的分区大致相等并且数据被均匀分布。

coalescerepartition 哪个更快?

coalesce 运行速度可能比 repartition 更快,但是处理不均匀大小的分区通常比相同大小的分区要慢。通常在过滤大数据集后需要重新分区。我发现 repartition 总体上更快,因为 Spark 是构建用于处理相同大小分区的。

注意:我好奇地观察到 repartition 可能会增加磁盘上的数据大小。确保在使用 repartition/coalesce 处理大型数据集时运行测试。

如果您需要更多细节,请阅读此博客文章。

实践中何时使用 coalesce 和 repartition


14
谢谢@Powers的好回答,但是A和B分区中的数据不是有偏的吗?如何使它们均匀分布? - anwartheravian
1
另外,获取分区大小的最佳方法是什么,而不会出现OOM错误。我使用rdd.glom().map(len).collect(),但它会产生很多OOM错误。 - anwartheravian
23
Partition A和Partition B的大小不同,是因为“重新分区(repartition)”算法在对非常小的数据集进行分发时无法平均分配数据。我使用“repartition”将500万条记录组织到13个分区中,每个文件大小在89.3 MB至89.6 MB之间 - 这相当平均! - Powers
1
@Powers 这个回答看起来更好,有详细说明。 - Green
1
这样解释更好。谢谢! - Abhi
显示剩余14条评论

50

repartition - 推荐在增加分区数量时使用,因为它涉及所有数据的洗牌。

coalesce - 推荐在减少分区数量时使用。例如,如果您有3个分区并且要将其减少到2个,则coalesce将第三个分区数据移动到分区1和2中。分区1和2将保持在同一容器中。 另一方面,repartition会对所有分区的数据进行洗牌,因此执行者之间的网络使用率会很高,并影响性能。

当减少分区数量时,coalescerepartition表现更好。


有用的解释。 - Narendra Maru
@Kamalesan C - 用简单的语言讲解得非常好,我真希望能给这个回答多次点赞。 - Nitin Ware
你没有提到:Spark将会尽可能地将合并操作向下推迟到最早的时间点。这意味着它可以影响之前操作的并行性(减少)。 - idan ahal

36

这里需要注意的是,Spark RDD的基本原则是不可变性。repartition或coalesce操作将创建新的RDD。基础RDD将继续存在,并保持其原始分区数。如果使用情况要求将RDD持久保存在缓存中,则必须对新创建的RDD执行相同的操作。

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

1
不错!对于有经验的Scala开发人员来说,这是至关重要的,但很明显——即,无论是_repartition_还是_coalesce_都不会尝试修改数据,只是修改了它在节点之间的分布方式。 - doug
1
@Harikrishnan,如果我正确理解了其他答案,那么根据它们的说法,在coalesce情况下Spark使用现有分区,但是由于RDD是不可变的,您能描述一下coalesce如何利用现有分区吗?根据我的理解,我认为Spark会将新分区附加到现有分区中。 - Explorer
但是,如果“旧”的RDD不再被使用,因为执行图已知道它,如果没有持久化,它将从内存中清除,对吗? - Markus

20

即使分区数量减少的情况下,使用repartition >> coalesce 也有用例。这在将数据写入单个文件时非常有用。

@Rob的答案启示了正确的方向,但我认为还需要进一步解释才能理解底层正在发生什么。

如果您需要在写入之前过滤数据,则 repartition coalesce 更适合,因为coalesce会在加载操作之前被推送下去。

例如: load().map(…).filter(…).coalesce(1).save()

转换为: load().coalesce(1).map(…).filter(…).save()

这意味着所有数据都将折叠成一个分区,然后进行过滤,失去所有并行性。 即使是像column='value'这样的非常简单的过滤器也会发生这种情况。

这种情况不会发生在repartition中:load().map(…).filter(…).repartition(1).save()

在这种情况下,过滤在原始分区上并行进行。

仅仅为了给出一个数量级,在我的情况下,从Hive表加载109M行(~105G),使用 ~1000个分区进行过滤,coalesce(1)的运行时间从 ~6小时下降到了repartition(1)的 ~2分钟。

这个具体的例子来自AirBnB的这篇文章,该文章非常好,并且涵盖了Spark重分区技术的更多方面。

你对这个非常确定吗?我会在今天下午检查。 - thebluephantom
在编写时,使用Spark 2.4.x可以达到100%的效果,尚未尝试过更新版本,如果您尝试了,请告诉我们! :) - Alessandro S.
好的,我这周晚些时候会看一下 Databricks 模拟。谢谢。 - thebluephantom
我在CSV文件上尝试了这个。使用Spark v2.4.5,但是我在DAG上看不到正确的顺序。coalesce并没有首先出现。您能否添加一些更多的细节,例如DAG或物理计划,以显示coalesce将被优先推下去。 - Mohana B C
我发现由于过滤器,发生了与我完全相同的事情。 - Partha Mandal

19
code和代码文档可以得出的结论是,coalesce(n)coalesce(n, shuffle = false)相同,repartition(n)coalesce(n, shuffle = true)相同。
因此,coalescerepartition都可用于增加分区数。
使用shuffle = true,您实际上可以将分区合并到更大的数量。如果您有少量分区(例如100个),可能有少量异常大的分区,则这非常有用。
另一个需要强调的重要注意事项是,如果您显著减少分区数量,则应考虑使用coalesceshuffle版本(在该情况下与repartition相同)。这将使您的计算在父分区上并行执行(多个任务)。
但是,如果您要进行大规模合并,例如到numPartitions = 1,则可能会导致计算发生在您不希望的较少的节点上(例如,在numPartitions = 1的情况下为一个节点)。为避免此问题,您可以传递shuffle = true。这将添加一个洗牌步骤,但意味着当前的上游分区将以并行方式执行(根据当前的分区方式)。

请参考相关答案这里


16

所有的回答都为这个常被问到的问题增加了一些很棒的知识。

所以按照这个问题时间线的传统,以下是我的意见。

我发现在非常特定的情况下,重新分区比合并分区更快

在我的应用程序中,当我们估计的文件数低于某个阈值时,重新分区的速度更快。

这就是我的意思。

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

在上面的代码片段中,如果我的文件少于20个,合并操作将需要很长时间来完成,而重新分区会快得多,因此使用了上面的代码。

当然,这个数字(20)将取决于工作者数量和数据量。

希望能有所帮助。


与其将其设置为像20这样的硬数字,更有意义的是将文件数与集群中节点数进行比较。您可以使用以下代码行获取执行程序的数量: sc._jsc.sc().getExecutorMemoryStatus().size()其中,sc是一个pyspark SparkContext对象。如果您使用的是scala或java,则更简单: sc.getExecutorMemoryStatus().size() - Nolan Barth

11

重新分区(Repartition):将数据随机分为新数量的分区。

例如,初始数据框被划分为200个分区。

df.repartition(500):数据将从200个分区洗牌到新的500个分区。

合并分区(Coalesce):将数据随机分配到现有数量的分区中。

df.coalesce(5):数据将从剩余的195个分区洗牌到现有的5个分区中。


8
我想补充一下Justin和Power的答案 - repartition会忽略现有的分区并创建新的分区。因此,您可以使用它来解决数据倾斜问题。您可以提到分区键以定义分布。数据倾斜是'大数据'问题空间中最大的问题之一。 coalesce将与现有分区一起工作并洗牌其中的子集。它不能像repartition那样解决数据倾斜问题。因此,即使它更便宜,也可能不是您需要的东西。

8

重新分区(Repartition)基本上允许您增加或减少分区的数量。重新分区会重新分配来自所有分区的数据,这会导致完全洗牌,这是非常昂贵的操作。

合并分区(Coalesce)是重新分区的优化版本,在此版本中,您只能减少分区的数量。由于我们只能减少分区的数量,因此它会将某些分区合并为单个分区。通过合并分区,与重新分区相比,在分区之间传输数据的移动更少。因此,在合并分区中,数据移动最小,但说合并分区不进行数据移动是完全错误的陈述。

另一件事是,在重新分区中,通过提供分区数,它尝试在所有分区上均匀地重新分配数据,而在合并分区的情况下,我们仍然可能存在某些情况下的数据倾斜。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接