什么是Spark转换会导致Shuffle?

45

我在Spark文档中难以找到导致洗牌(shuffle)和不导致的操作。在这个列表中,哪些操作会导致洗牌(shuffle),哪些不会?

map和filter不会导致洗牌(shuffle)。然而,其他操作我不确定。

map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
4个回答

51

实际上,即使没有文档,也非常容易找到这个信息。对于这些函数中的任何一个,只需创建一个RDD并调用debug string即可。下面是一个示例,您可以自己完成其余部分。

scala> val a  = sc.parallelize(Array(1,2,3)).distinct
scala> a.toDebugString
MappedRDD[5] at distinct at <console>:12 (1 partitions)
  MapPartitionsRDD[4] at distinct at <console>:12 (1 partitions)
    **ShuffledRDD[3] at distinct at <console>:12 (1 partitions)**
      MapPartitionsRDD[2] at distinct at <console>:12 (1 partitions)
        MappedRDD[1] at distinct at <console>:12 (1 partitions)
          ParallelCollectionRDD[0] at parallelize at <console>:12 (1 partitions)

正如您所看到的,distinct会创建一个洗牌。找出这种方式特别重要,而不是通过文档,因为在某些情况下,某些函数需要或不需要进行洗牌。例如,连接通常需要进行洗牌,但如果您连接两个来自同一RDD分支的RDD,则Spark有时会省略洗牌。


15
关于编程的一个通常很好的观点是,试着通过阅读代码和使用代码来回答问题,而不是阅读文档。文档通常会包含信息的重复,而重复会导致错误和信息的丢失。当阅读代码时,可以准确地知道正在发生什么 :) - samthebest
1
根据文档,toDebugString 返回“用于调试的此 RDD 及其递归依赖项的说明。” 因此,即使最近的转换不涉及 shuffle,它也将包括来自先前转换的可能 shuffle,对吗? - CyberPlayerOne

25

谢谢,我认为这应该是答案,但我不是答案的保管人 ;) - SparkleGoat

5

4
这里是关于洗牌转换的概括性说明。
可以引起洗牌的转换包括重新分区操作,如repartitioncoalesce,除计数外的ByKey操作,如groupByKeyreduceByKey,以及连接操作,如cogroupjoin来源

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接