Apache Spark中何时发生数据洗牌?

18

我正在优化Spark中的参数,并想确切了解Spark如何进行数据洗牌。

准确地说,我有一个简单的单词计数程序,并想知道spark.shuffle.file.buffer.kb如何影响运行时间。目前,当我将此参数设置得很高时(我猜测这会防止每个任务的缓冲区同时适合内存中),我只看到了减速。

有人能解释一下Spark是如何执行缩减的吗?例如,数据在RDD中被读取和分区,并且当调用“action”函数时,Spark向工作节点发送任务。如果该操作是一个缩减,Spark如何处理它,shuffle文件/缓冲区与此过程有何关系?

2个回答

32

问题:关于Spark何时触发洗牌(shuffling)的问题?

答案:任何joincogroupByKey操作都涉及将对象保存在哈希映射表或内存缓冲区中进行分组或排序。joincogroupgroupByKey在触发它们的Shuffle所在的任务中使用这些数据结构。而reduceByKeyaggregateByKey则在触发Shuffle所在的任务阶段的两边使用数据结构。

解释:Spark中如何实现Shuffle操作?

与Hadoop相比,Spark中的Shuffle操作实现方式略有不同。我不知道你是否熟悉Hadoop中的工作原理,但我们先专注于Spark。

Map端,Spark中每个Map任务都会为每个Reducer写出一个Shuffle文件(操作系统磁盘缓冲器),这对应于Spark中的逻辑块。这些文件并不是中介文件,因为Spark不会将它们合并成更大的分区。由于Spark中调度开销较小,Mapper数量(M)和Reducer数量(R)比Hadoop中大得多。因此,将M*R个文件传递给相应的Reducer可能会导致显著的开销。

与Hadoop类似,Spark还提供了一个参数spark.shuffle.compress来指定压缩库以压缩Map输出结果。在这种情况下,可以使用默认的SnappyLZF作为压缩库。使用Snappy时,每个打开的文件只使用33KB的缓冲区,从而显著降低了遇到内存不足错误的风险。

reduce 端,Spark要求所有的洗牌数据都能够适应相应 reducer 任务的内存,这与 Hadoop 相反,后者有一个选项将数据溢写到磁盘。当 reducer 任务需要全部洗牌数据进行 GroupByKeyReduceByKey 操作时,才会发生这种情况。在这种情况下,Spark 会抛出 OutOfMemory 异常,这对开发人员来说是一个巨大的挑战。
另外,在 Spark 中没有重叠的复制阶段,而 Hadoop 有一个重叠的复制阶段,在该阶段,mapper 甚至在 map 完成之前就将数据推送给 reducers。这意味着在 Spark 中,洗牌是一种“拉取”操作,而在 Hadoop 中则是一种“推送”操作。每个 reducer 还应维护一个网络缓冲区以获取 map 输出。此缓冲区的大小通过参数 spark.reducer.maxMbInFlight 指定(默认情况下为48MB)。
关于 Apache Spark 中的洗牌更多信息,建议阅读以下内容:

我有一个后续问题:Spark如何选择哪些任务将作为哪些Reducer?换句话说,Spark如何决定从哪里“拉取”用于减少的Shuffle文件? - cnnrznn
1
答案比较长,建议您阅读这个问题的答案。 - eliasah
1
@eliasah,Spark是否仍需要将所有洗牌数据适合相应的Reducer任务的内存中?在我的Spark程序中,ReduceByKey操作在这种情况下会抛出内存不足异常。有没有办法处理这个问题? - CRM
1
我不知道有任何关于那个的变化,所以我会说“是的,那个属性仍然有效”。至于你的第二个问题,在评论中回答有点宽泛,我建议你提出一个新的问题,并提供更多细节。(@zero323,你觉得呢?) - eliasah
抱歉,我是一名Spark新手,对于句子中的“shipping”一词并不十分理解,即“因此,将M*R文件发送到相应的Reducer可能会导致显着的开销。”这应该像copymove操作还是其他什么?我之前尝试过搜索,但没有找到合适的描述。 - Bowen Peng
1
@BowenPeng,我所说的关于“shipping”的意思确实是复制/移动的行为。这回答了你的问题吗? - eliasah

2

当数据需要在执行器(工作节点)之间移动时,就会发生这种情况。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接