repartition()
版本,称为coalesce()
,它允许避免数据移动,但前提是您减少了RDD分区的数量。我得到的一个区别是,使用
repartition()
可以增加/减少分区的数量,而使用coalesce()
只能减少分区的数量。如果分区分布在多台计算机上,并且运行
coalesce()
,那么它如何避免数据移动?repartition()
版本,称为coalesce()
,它允许避免数据移动,但前提是您减少了RDD分区的数量。repartition()
可以增加/减少分区的数量,而使用coalesce()
只能减少分区的数量。coalesce()
,那么它如何避免数据移动?它避免了完全洗牌。如果知道数字是递减的,那么执行器可以安全地保留数据在最小数量的分区上,只将数据从额外的节点移动到我们保留的节点上。
因此,它会像这样进行:
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
然后将数据 coalesce
到2个分区:
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
请注意,节点1和节点3不需要其原始数据即可移动。
Justin的回答很棒,这个回复更深入地解释了一下。
repartition
算法会进行完整的洗牌(shuffle),并创建具有均匀数据分布的新分区。让我们创建一个包含1到12的数字的DataFrame。
val x = (1 to 12).toList
val numbersDf = x.toDF("number")
numbersDf
在我的机器上有4个分区。
numbersDf.rdd.partitions.size // => 4
以下是数据在分区上的划分方式:
Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12
我们使用repartition
方法进行完全洗牌,并将此数据分配到两个节点上。
val numbersDfR = numbersDf.repartition(2)
这是我机器上对numbersDfR
数据进行分区的方式:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11
repartition
方法可以创建新的分区,并将数据均匀地分配到这些新的分区中(对于较大的数据集,数据分布更加均匀)。
coalesce
和 repartition
的区别
coalesce
使用现有的分区来最小化需要洗牌的数据量,而 repartition
则创建新分区并执行完整的洗牌操作。结果是,coalesce
生成的分区具有不同数量的数据(有时是非常不同的大小),而 repartition
生成的分区大致相等并且数据被均匀分布。
coalesce
或 repartition
哪个更快?
coalesce
运行速度可能比 repartition
更快,但是处理不均匀大小的分区通常比相同大小的分区要慢。通常在过滤大数据集后需要重新分区。我发现 repartition
总体上更快,因为 Spark 是构建用于处理相同大小分区的。
注意:我好奇地观察到 repartition 可能会增加磁盘上的数据大小。确保在使用 repartition/coalesce 处理大型数据集时运行测试。
实践中何时使用 coalesce 和 repartition
rdd.glom().map(len).collect()
,但它会产生很多OOM错误。 - anwartheravianrepartition
- 推荐在增加分区数量时使用,因为它涉及所有数据的洗牌。
coalesce
- 推荐在减少分区数量时使用。例如,如果您有3个分区并且要将其减少到2个,则coalesce
将第三个分区数据移动到分区1和2中。分区1和2将保持在同一容器中。
另一方面,repartition
会对所有分区的数据进行洗牌,因此执行者之间的网络使用率会很高,并影响性能。
当减少分区数量时,coalesce
比repartition
表现更好。
这里需要注意的是,Spark RDD的基本原则是不可变性。repartition或coalesce操作将创建新的RDD。基础RDD将继续存在,并保持其原始分区数。如果使用情况要求将RDD持久保存在缓存中,则必须对新创建的RDD执行相同的操作。
scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26
scala> res16.partitions.length
res17: Int = 10
scala> pairMrkt.partitions.length
res20: Int = 2
即使分区数量减少的情况下,使用repartition >> coalesce 也有用例。这在将数据写入单个文件时非常有用。
@Rob的答案启示了正确的方向,但我认为还需要进一步解释才能理解底层正在发生什么。
如果您需要在写入之前过滤数据,则 repartition 比 coalesce 更适合,因为coalesce会在加载操作之前被推送下去。
例如:
load().map(…).filter(…).coalesce(1).save()
转换为:
load().coalesce(1).map(…).filter(…).save()
这意味着所有数据都将折叠成一个分区,然后进行过滤,失去所有并行性。
即使是像column='value'
这样的非常简单的过滤器也会发生这种情况。
这种情况不会发生在repartition中:load().map(…).filter(…).repartition(1).save()
在这种情况下,过滤在原始分区上并行进行。
仅仅为了给出一个数量级,在我的情况下,从Hive表加载109M行(~105G),使用 ~1000个分区进行过滤,coalesce(1)的运行时间从 ~6小时下降到了repartition(1)的 ~2分钟。
这个具体的例子来自AirBnB的这篇文章,该文章非常好,并且涵盖了Spark重分区技术的更多方面。coalesce(n)
与coalesce(n, shuffle = false)
相同,repartition(n)
与coalesce(n, shuffle = true)
相同。coalesce
和repartition
都可用于增加分区数。shuffle = true
,您实际上可以将分区合并到更大的数量。如果您有少量分区(例如100个),可能有少量异常大的分区,则这非常有用。coalesce
的shuffle版本(在该情况下与repartition
相同)。这将使您的计算在父分区上并行执行(多个任务)。numPartitions = 1
,则可能会导致计算发生在您不希望的较少的节点上(例如,在numPartitions = 1
的情况下为一个节点)。为避免此问题,您可以传递shuffle = true
。这将添加一个洗牌步骤,但意味着当前的上游分区将以并行方式执行(根据当前的分区方式)。
请参考相关答案这里
所有的回答都为这个常被问到的问题增加了一些很棒的知识。
所以按照这个问题时间线的传统,以下是我的意见。
我发现在非常特定的情况下,重新分区比合并分区更快。
在我的应用程序中,当我们估计的文件数低于某个阈值时,重新分区的速度更快。
这就是我的意思。
if(numFiles > 20)
df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
在上面的代码片段中,如果我的文件少于20个,合并操作将需要很长时间来完成,而重新分区会快得多,因此使用了上面的代码。
当然,这个数字(20)将取决于工作者数量和数据量。
希望能有所帮助。
sc._jsc.sc().getExecutorMemoryStatus().size()
其中,sc是一个pyspark SparkContext对象。如果您使用的是scala或java,则更简单:
sc.getExecutorMemoryStatus().size()
- Nolan Barth重新分区(Repartition):将数据随机分为新数量的分区。
例如,初始数据框被划分为200个分区。
df.repartition(500)
:数据将从200个分区洗牌到新的500个分区。
合并分区(Coalesce):将数据随机分配到现有数量的分区中。
df.coalesce(5)
:数据将从剩余的195个分区洗牌到现有的5个分区中。
repartition
会忽略现有的分区并创建新的分区。因此,您可以使用它来解决数据倾斜问题。您可以提到分区键以定义分布。数据倾斜是'大数据'问题空间中最大的问题之一。
coalesce
将与现有分区一起工作并洗牌其中的子集。它不能像repartition
那样解决数据倾斜问题。因此,即使它更便宜,也可能不是您需要的东西。重新分区(Repartition)基本上允许您增加或减少分区的数量。重新分区会重新分配来自所有分区的数据,这会导致完全洗牌,这是非常昂贵的操作。
合并分区(Coalesce)是重新分区的优化版本,在此版本中,您只能减少分区的数量。由于我们只能减少分区的数量,因此它会将某些分区合并为单个分区。通过合并分区,与重新分区相比,在分区之间传输数据的移动更少。因此,在合并分区中,数据移动最小,但说合并分区不进行数据移动是完全错误的陈述。
另一件事是,在重新分区中,通过提供分区数,它尝试在所有分区上均匀地重新分配数据,而在合并分区的情况下,我们仍然可能存在某些情况下的数据倾斜。
coalesce
之外,是否有任何情况下需要使用repartition
? - TheMPrepartition
做的就是使用shuffle
参数设置为 true 来调用coalesce
。如果有帮助,请告诉我。 - Justin Pihony