不需要显式调用"cache"或"persist"来将RDD数据存储到内存中。RDD数据默认以分布式方式存储在内存中。
val textFile = sc.textFile("/user/emp.txt")
据我了解,完成上述步骤后,textFile是一个RDD并且可以在所有/某些节点的内存中使用。
如果是这样,为什么我们还需要对textFile RDD调用“cache”或“persist”呢?
val textFile = sc.textFile("/user/emp.txt")
据我了解,完成上述步骤后,textFile是一个RDD并且可以在所有/某些节点的内存中使用。
如果是这样,为什么我们还需要对textFile RDD调用“cache”或“persist”呢?
大多数RDD操作都是惰性的。将RDD视为一系列操作的描述。RDD不是数据。因此,这行代码:
val textFile = sc.textFile("/user/emp.txt")
它并没有实际操作,它会创建一个RDD,表示“我们需要加载此文件”。这时候文件并没有被加载。RDD.count
用于告诉你文件中有多少行,因此需要读取文件内容。如果你写了textFile.count
,那么此时文件将被读取,行数将被计算,并返回计数。textFile.count
呢?同样的事情会发生:文件将会被再次读取和计数。没有任何东西被存储起来。RDD本身并不是数据。RDD.cache
有什么作用呢?如果在上述代码中添加textFile.cache
:val textFile = sc.textFile("/user/emp.txt")
textFile.cache
它什么也不做。RDD.cache
也是一个惰性操作。文件仍然没有被读取。但现在RDD会说“读取这个文件,然后缓存内容”。如果你运行textFile.count
第一次,文件将被加载、缓存和计数。如果你第二次调用textFile.count
,该操作将使用缓存。它只会从缓存中获取数据并计算行数。
缓存行为取决于可用内存。例如,如果文件无法适应内存,则textFile.count
将返回到通常的行为并重新读取文件。
我认为问题最好重新表述一下:
Spark处理过程是懒惰的,也就是说,除非必须执行,否则不会发生任何事情。
简单回答这个问题,在执行val textFile = sc.textFile("/user/emp.txt")
之后,数据不会发生任何变化,只会构建一个HadoopRDD
来使用该文件作为数据源。
假设我们对该数据进行了一些转换:
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
再次强调,数据并没有发生任何变化。现在有一个新的RDD wordsRDD
,它包含对testFile
的引用以及当需要时要应用的函数。
仅当对RDD执行操作时,例如wordsRDD.count
,RDD链(称为lineage)才会被执行。也就是说,数据将被分区加载到Spark集群的执行程序中,将应用flatMap
函数并计算结果。
在像本示例中那样的线性衍生上,不需要使用cache()
。数据将被加载到执行程序中,所有转换都将被应用,最后将计算出count
- 如果数据适合内存,所有这些都在内存中完成。
cache
在RDD衍生出多个分支时非常有用。假设你想将前面示例中的单词过滤成正向和负向词汇的数量。您可以按以下方式执行此操作:
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
在这里,每个分支都会重新加载数据。添加显式的cache
语句将确保之前进行的处理被保留和重复使用。工作将如下所示:
val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
因此,缓存
被称为“打破血统”,因为它创建了一个可以用于进一步处理的检查点。
经验法则:当您的RDD的血统分支出或者RDD在循环中被多次使用时,请使用缓存
。
spark.storage.memoryFraction
控制。关于哪个执行器拥有哪些数据,RDD 将跟踪其分布在执行器上的分区。 - maasgcache
和persist
都不能 _打破血统关系(lineage)_。 - zero323我们需要显式地调用“cache”或“persist”来将RDD数据存储到内存中吗?
是的,只有在需要的情况下才需要这样做。
RDD数据默认以分布式方式存储于内存中吗?
不是!
以下是原因:
Spark支持两种类型的共享变量:广播变量和累加器。广播变量可用于在所有节点上缓存值,而累加器则是仅进行“添加”的变量,例如计数器和总和。
RDD支持两种操作:转换和动作。转换从现有数据集创建新的数据集,而动作在对数据集运行计算后返回值给驱动程序。例如,map是一种转换,它通过函数传递每个数据集元素并返回表示结果的新RDD。另一方面,reduce是一种动作,使用某些功能聚合RDD的所有元素,并将最终结果返回给驱动程序(尽管还有一个并行的reduceByKey,它返回分布式数据集)。
Spark中的所有转换都是惰性的,即它们不会立即计算其结果。相反,它们只记住应用于某些基本数据集(例如文件)的转换。只有当动作需要返回结果给驱动程序时,才会计算这些转换。这种设计使Spark能够更有效地运行-例如,我们可以意识到通过map创建的数据集将在一个reduce中使用,并仅返回减少结果而不是较大的映射数据集。
默认情况下,每个转换后的RDD在每次运行动作时都可能被重新计算。然而,您也可以使用persist(或cache)方法将RDD存储在内存中,在这种情况下,Spark将在集群上保留元素以便在下次查询时更快地访问。还支持将RDD持久化到磁盘上,或者复制到多个节点。
更多详情请查看Spark编程指南。
以下是应该缓存RDD的三种情况:
多次使用同一个RDD
对同一个RDD执行多个操作
长链式(或非常昂贵的)转换
添加(或临时添加)cache
方法调用的另一个原因是为了调试内存问题。
使用cache
方法,Spark将提供有关RDD大小的调试信息。因此,在Spark集成的UI中,您将获得RDD内存消耗信息。这对于诊断内存问题非常有帮助。