为什么需要在RDD上调用cache或persist方法?

192
当从文本文件或集合(或另一个RDD)创建一个弹性分布式数据集(RDD)时,我们需要显式地调用"cache"或"persist"来将RDD数据存储到内存中吗?还是RDD数据默认以分布式方式存储在内存中?
不需要显式调用"cache"或"persist"来将RDD数据存储到内存中。RDD数据默认以分布式方式存储在内存中。
val textFile = sc.textFile("/user/emp.txt")

据我了解,完成上述步骤后,textFile是一个RDD并且可以在所有/某些节点的内存中使用。

如果是这样,为什么我们还需要对textFile RDD调用“cache”或“persist”呢?

5个回答

336

大多数RDD操作都是惰性的。将RDD视为一系列操作的描述。RDD不是数据。因此,这行代码:

val textFile = sc.textFile("/user/emp.txt")
它并没有实际操作,它会创建一个RDD,表示“我们需要加载此文件”。这时候文件并没有被加载。
需要观察数据内容的RDD操作是不能懒惰的。这些操作被称为“actions”。例如,RDD.count用于告诉你文件中有多少行,因此需要读取文件内容。如果你写了textFile.count,那么此时文件将被读取,行数将被计算,并返回计数。
如果你再次调用 textFile.count 呢?同样的事情会发生:文件将会被再次读取和计数。没有任何东西被存储起来。RDD本身并不是数据。
那么RDD.cache有什么作用呢?如果在上述代码中添加textFile.cache
val textFile = sc.textFile("/user/emp.txt")
textFile.cache

它什么也不做。RDD.cache也是一个惰性操作。文件仍然没有被读取。但现在RDD会说“读取这个文件,然后缓存内容”。如果你运行textFile.count第一次,文件将被加载、缓存和计数。如果你第二次调用textFile.count,该操作将使用缓存。它只会从缓存中获取数据并计算行数。

缓存行为取决于可用内存。例如,如果文件无法适应内存,则textFile.count将返回到通常的行为并重新读取文件。


4
嗨,丹尼尔,当你调用缓存时,这是否意味着RDD不会重新从源(例如文本文件)加载?当缓存数据时,如何确保来自文本文件的数据是最新的?(Spark是否能够自动处理,还是需要定期手动取消持久化以确保后续重算源数据?) - andrew.butkus
如果必须定期取消持久化,如果您有一个已缓存的RDD,依赖于另一个已缓存的RDD,那么您是否必须取消持久化这两个RDD才能看到重新计算的结果? - andrew.butkus
24
Spark假设文件不会改变。它在任意时间点读取文件,并可能稍后重新读取其中的某些部分(例如,如果数据的一部分已经被推出缓存)。因此,最好保持文件不变!当您有新数据时,只需创建一个新文件并赋予不同的名称,然后将其作为新RDD加载。如果您不断获得新数据,请考虑使用Spark Streaming。 - Daniel Darabos
10
是的,RDD是不可变的,因此每个RDD都假定其依赖项也是不可变的。Spark Streaming允许您设置这样的树,以对更改流进行操作。但更简单的解决方案是在以文件名为参数的函数中构建树。然后只需为新文件调用该函数,就可以得到新的计算树。 - Daniel Darabos
1
@Humoyun:在Spark UI的存储选项卡上,您可以查看每个RDD有多少缓存。数据可能非常大,以至于只有40%适合用于缓存的总内存。在这种情况下,一种选择是使用“persist”并选择允许将缓存数据溢出到磁盘的存储选项。 - Daniel Darabos
显示剩余2条评论

210

我认为问题最好重新表述一下:

什么时候需要在RDD上调用缓存或持久化操作?

Spark处理过程是懒惰的,也就是说,除非必须执行,否则不会发生任何事情。 简单回答这个问题,在执行val textFile = sc.textFile("/user/emp.txt")之后,数据不会发生任何变化,只会构建一个HadoopRDD来使用该文件作为数据源。

假设我们对该数据进行了一些转换:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

再次强调,数据并没有发生任何变化。现在有一个新的RDD wordsRDD,它包含对testFile的引用以及当需要时要应用的函数。

仅当对RDD执行操作时,例如wordsRDD.count,RDD链(称为lineage)才会被执行。也就是说,数据将被分区加载到Spark集群的执行程序中,将应用flatMap函数并计算结果。

在像本示例中那样的线性衍生上,不需要使用cache()。数据将被加载到执行程序中,所有转换都将被应用,最后将计算出count - 如果数据适合内存,所有这些都在内存中完成。

cache在RDD衍生出多个分支时非常有用。假设你想将前面示例中的单词过滤成正向和负向词汇的数量。您可以按以下方式执行此操作:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

在这里,每个分支都会重新加载数据。添加显式的cache语句将确保之前进行的处理被保留和重复使用。工作将如下所示:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

因此,缓存被称为“打破血统”,因为它创建了一个可以用于进一步处理的检查点。

经验法则:当您的RDD的血统分支出或者RDD在循环中被多次使用时,请使用缓存


2
太棒了,谢谢。还有一个相关的问题。当我们缓存或持久化数据时,数据将存储在执行器的内存或工作节点的内存中。如果是执行器的内存,Spark如何识别哪个执行器拥有数据。 - user1261215
1
@RamanaUppala 执行器内存被使用。用于缓存的执行器内存比例由配置 spark.storage.memoryFraction 控制。关于哪个执行器拥有哪些数据,RDD 将跟踪其分布在执行器上的分区。 - maasg
6
如果我理解正确的话,@maasg之前说过cachepersist都不能 _打破血统关系(lineage)_。 - zero323
如果在这两个计数之前,我们将这两个分支合并回一个RDD并进行计数,那么缓存是否有益呢? - Xiawei Zhang
当我们在具有8个分区的DataFrame上进行广播时,Spark首先会将所有数据发送到Driver,然后Driver再进行广播,还是有其他的方式? - Sankar
显示剩余2条评论

31

我们需要显式地调用“cache”或“persist”来将RDD数据存储到内存中吗?

是的,只有在需要的情况下才需要这样做。

RDD数据默认以分布式方式存储于内存中吗?

不是!

以下是原因:

  • Spark支持两种类型的共享变量:广播变量和累加器。广播变量可用于在所有节点上缓存值,而累加器则是仅进行“添加”的变量,例如计数器和总和。

  • RDD支持两种操作:转换和动作。转换从现有数据集创建新的数据集,而动作在对数据集运行计算后返回值给驱动程序。例如,map是一种转换,它通过函数传递每个数据集元素并返回表示结果的新RDD。另一方面,reduce是一种动作,使用某些功能聚合RDD的所有元素,并将最终结果返回给驱动程序(尽管还有一个并行的reduceByKey,它返回分布式数据集)。

  • Spark中的所有转换都是惰性的,即它们不会立即计算其结果。相反,它们只记住应用于某些基本数据集(例如文件)的转换。只有当动作需要返回结果给驱动程序时,才会计算这些转换。这种设计使Spark能够更有效地运行-例如,我们可以意识到通过map创建的数据集将在一个reduce中使用,并仅返回减少结果而不是较大的映射数据集。

  • 默认情况下,每个转换后的RDD在每次运行动作时都可能被重新计算。然而,您也可以使用persist(或cache)方法将RDD存储在内存中,在这种情况下,Spark将在集群上保留元素以便在下次查询时更快地访问。还支持将RDD持久化到磁盘上,或者复制到多个节点。

更多详情请查看Spark编程指南


1
那并没有回答我的问题。 - user1261215
1
当 RDD 的数据默认存储在内存中时,为什么需要调用 Cache 或 Persist? - user1261215
默认情况下,RDD不会被存储在内存中,因此持久化RDD可以使Spark在集群上更快地执行转换。 - eliasah
当我们调用persist或cache时,它才真正成为“弹性分布式数据集”。否则,它不是分布式数据集,数据也不会存储在内存中。这正确吗? - user1261215
2
这是一个很好的答案,我不知道为什么会被踩。它是自上而下的回答,从高层次概念解释了RDD的工作原理。我添加了另一个自下而上的答案:从“这行代码做了什么”开始。对于刚开始使用Spark的人来说,可能更容易理解。 - Daniel Darabos
显示剩余4条评论

12

以下是应该缓存RDD的三种情况:

多次使用同一个RDD

对同一个RDD执行多个操作

长链式(或非常昂贵的)转换


8

添加(或临时添加)cache方法调用的另一个原因是为了调试内存问题。

调试内存问题

使用cache方法,Spark将提供有关RDD大小的调试信息。因此,在Spark集成的UI中,您将获得RDD内存消耗信息。这对于诊断内存问题非常有帮助。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接