理解Spark中的shuffle管理器

15

让我帮助澄清关于深度洗牌以及Spark如何使用洗牌管理器的问题。我报告了一些非常有用的资源:

https://trongkhoanguyenblog.wordpress.com/

https://0x0fff.com/spark-architecture-shuffle/

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md

阅读它们,我了解到有不同的洗牌管理器。我想专注于其中两个: hash managersort manager(这是默认管理器)。

为了提出我的问题,我想从一个非常常见的转换开始:

val rdd = reduceByKey(_ + _)

这种转换会导致地图端聚合,然后进行洗牌(shuffle),将所有相同的键放入同一个分区。

我的问题是:

  • 是否使用内部的mapPartition转换来实现地图端聚合,从而使用组合器函数汇总所有相同的键,或者使用AppendOnlyMapExternalAppendOnlyMap实现?

  • 如果使用AppendOnlyMapExternalAppendOnlyMap映射进行聚合,它们是否也用于在ResultTask中进行的减少端聚合

  • 这两种映射(AppendOnlyMapExternalAppendOnlyMap)的目的是什么?

  • AppendOnlyMapExternalAppendOnlyMap是否由所有shuffle管理器使用,还是只由sortManager使用?

  • 我读到说,AppendOnlyMapExternalAppendOnlyMap填满后,会被溢写到文件中,具体步骤是如何进行的?

  • 使用Sort shuffle管理器,我们使用appendOnlyMap进行聚合和组合分区记录,对吗?然后当执行内存填满时,我们开始对map进行排序,将其溢写到磁盘上,然后清除map,我的问题是:溢写到磁盘和shuffle write之间有什么区别?它们基本上都是在本地文件系统上创建文件,但它们的处理方式不同。Shuffle写入的记录不会被放入appendOnlyMap。

  • 您能否深入解释一下执行reduceByKey时会发生什么,为实现这一目标所涉及的所有步骤,例如地图端聚合、洗牌等等。

1个回答

6

下面逐步介绍了reduceByKey的描述:

  1. reduceByKey 调用 combineByKeyWithTag,使用相同的合并器和创建值、合并值
  2. combineByKeyWithClassTag 创建一个 Aggregator 并返回 ShuffledRDD。"map" 和 "reduce" 两侧的聚合都使用内部机制,并不使用 mapPartitions
  3. Agregator 使用 ExternalAppendOnlyMap 来进行 combineValuesByKey("map side reduction")和combineCombinersByKey("reduce side reduction")
  4. 这两个方法都使用 ExternalAppendOnlyMap.insertAllMethod
  5. ExternalAppendOnlyMap 跟踪溢出部分 和当前的内存映射 (SizeTrackingAppendOnlyMap)
  6. insertAll 方法更新内存映射,并检查插入时,如果当前映射的估计大小超过阈值,则使用继承的 Spillable.maybeSpill 方法。如果超过阈值,则此方法会作为副作用调用 spill,并将 insertAll 初始化为干净的 SizeTrackingAppendOnlyMap
  7. spill 调用 spillMemoryIteratorToDisk,它从块管理器获取 DiskBlockObjectWriter 对象。

insertAll 步骤应用于具有相应的 Aggregator 函数和中间阶段的洗牌的 map 和 reduce 的聚合。

从 Spark 2.0 开始,只有基于排序的管理器:SPARK-14667


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接