Spark + Scala转换、不可变性和内存消耗开销

7

我在Youtube上观看了一些有关Spark架构的视频。

尽管惰性求值、数据容错性以及良好的函数式编程概念是Resilience Distributed Datasets成功的原因,但一个令人担忧的因素是由于多个转换导致的内存开销,因为数据不可变性会导致内存开销。

如果我正确理解了这个概念,每个转换都会创建新的数据集,因此内存需求将增加相应的倍数。如果我的代码中使用了10个转换,那么将创建10组数据集,我的内存消耗将增加10倍。

例如:

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

上面的例子有三个转换:flatMap,map和reduceByKey。这是否意味着对于X大小的数据,我需要3倍的数据内存?
我的理解正确吗?缓存RDD是唯一解决此问题的方法吗?
一旦我开始缓存,由于数据量大,可能会溢出到磁盘,因此性能会受到磁盘IO操作的影响。在这种情况下,Hadoop和Spark的性能可比吗?
编辑:
从答案和评论中,我已经了解了延迟初始化和管道过程。我的假设是X为初始RDD大小时需要3倍的内存不准确。
但是,是否可以将1个X RDD缓存在内存中并在管道中更新它?cache()如何工作?

嗨,我已经和Spark一起工作了一段时间(所以我忘记了很多细节),但你不需要3倍的内存。由于存在您提到的惰性评估,基本上只有一个操作。通过内部优化例程,它基本上只是“一个大”的转换。一旦您开始使用磁盘,性能应该会接近Hadoop的性能。 - Michael Brenndoerfer
@Michael 为什么性能会转到 MR? - Justin Pihony
它仍然更快,但当你开始使用磁盘时,“趋势是朝向Hadoop速度”,这基本上意味着它变慢了。但是你的答案是正确的,通过惰性评估,你无论如何都可以获得性能优势。 - Michael Brenndoerfer
2个回答

12

首先,惰性执行意味着函数组合可以发生:

scala> val rdd = sc.makeRDD(List("This is a test", "This is another test", 
                                 "And yet another test"), 1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[70] at makeRDD at <console>:27

scala> val counts = rdd.flatMap(line => {println(line);line.split(" ")}).
     | map(word => {println(word);(word,1)}).
     | reduceByKey((x,y) => {println(s"$x+$y");x+y}).
     | collect
This is a test
This
is
a
test
This is another test
This
1+1
is
1+1
another
test
1+1
And yet another test
And
yet
another
1+1
test
2+1
counts: Array[(String, Int)] = Array((And,1), (is,2), (another,2), (a,1), (This,2), (yet,1), (test,3))

首先请注意,我将并行度调整为1,以便我们可以看到单个worker的情况。然后,我在每个转换中添加了println,以便我们可以看到工作流程如何移动。您会发现它处理该行,然后处理该行的输出,接着是减少操作。因此,并没有为每个转换存储单独的状态,就像您建议的那样。相反,每个数据片段都经过了整个转换过程,直到需要洗牌,这可以通过UI中的DAG可视化来看到:

DAG

这就是惰性的优势。至于Spark v Hadoop,已经有很多内容了(只需搜索一下),但要点是Spark倾向于默认利用网络带宽,从而在这方面获得了提升。然后,如果已知模式并且可以利用DataFrames API,则可以通过惰性获得许多性能改进。

因此,总体上,Spark在几乎所有方面都明显击败了MR。


1
抱歉我不理解一个点。flatMap().map().reduceByKey() 会产生三个新的RDD,还是单个RDD,或者是两个RDD? - Ravindra babu
1
通过在每个阶段使用println的简洁而简单的方式来演示流水线概念。很多人都在询问流水线是否以这种方式工作,现在我知道如何轻松地演示它了。感谢您! - Alex Larikov
1
@ravindra,这里会有3个RDD,但新的RDD并不总是意味着数据的移动,因为RDD中的实际数据存储在分区内,子RDD知道其父RDD,这意味着mapflatMap将重用makeRDD创建的分区。相反,reduceByKey需要进行洗牌,这总是数据的移动 - 这就是为什么Spark将DAG拆分为两个阶段的原因。但是,在同一阶段内的转换可以通过Spark进行优化,因此makeRDD-> flatMap-> map将作为单个转换运行在每一行上,并且它将丢弃中间结果而不占用额外的内存。 - Alex Larikov
明白了。虽然它不会变成原始大小的3倍,但仍然比1倍大小要大一些。就内存而言,不可变性有一个缺点。是否可能在缓存中使用相同的RDD,并在管道上按顺序更新它,并将内存要求限制在最初的1倍大小? - Ravindra babu
你唯一能够更新缓存的方式是清空它,然后使底层数据集发生变化。RDD 本身是不可变的。 - Justin Pihony

3
如果您在Spark作业中有10个转换步骤,那么Spark的内存需求并不会增加10倍。当您在作业中指定转换步骤时,Spark会构建一个DAG图,以便执行所有作业步骤。然后,它将作业分解为阶段。一个阶段是一系列Spark可以在数据集上执行而无需洗牌的转换序列。
当RDD触发操作时,Spark会评估DAG。它只需将一个阶段中的所有转换应用到一起,直到达到该阶段的末尾,因此除非每个转换都导致洗牌(在这种情况下,可能是编写不良的作业),否则内存压力不太可能增加10倍。
我建议观看此次演讲并浏览幻灯片

我现在对10 X很清楚了。你也能回答第二个问题吗? - Ravindra babu
你不能对一个RDD进行cache()并在此之后更新它。这不像我们通常谈论的缓存。它只是在DAG中的某个特定点缓存状态。它只确保图中先前的步骤不会被多次计算。 - Saket
幻灯片的链接已更改。现在它将引导您到网站的主页。 - Itération 122442

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接