为什么Spark一直在重新计算RDD?

5
我使用flatMap创建了一个RDD。之后,我对它进行范围分区。如果我保留原始RDD,一切都可以正常工作。然而,如果我不缓存它,范围分区部分会以某种方式重新计算原始RDD的部分内容。我理解如果我的系统内存不足够,但在这种情况下,我系统中的内存远远超过RDD所占用的内存。其次,该RDD的计算非常耗时,因此这种重启/重新计算会严重影响性能。造成这种奇怪行为的原因是什么呢?
附言:我只使用RDD一次。因此,这不应该发生。

这是RDD的一个真实而令人沮丧的方面。 - WestCoastProjects
2个回答

8

这就是Spark的工作方式:

当你持久化一个RDD时,每个节点会将其计算出的任何分区存储在内存中,并在该数据集(或从中派生出的数据集)的其他操作中重复使用它们。

因此,如果您不持久化RDD,则不会发生上述情况。如果您要多次使用RDD并且有足够的内存,通常需要将其持久化。

这不能自动完成,因为Spark无法知道您是否会重复使用RDD:例如,您可以计算一个RDD,然后对其进行sample操作,并使用结果来决定是否要对RDD进行其他操作,因此RDD是否被使用两次取决于随机数生成器。


1
但在这种情况下,我只使用我的原始RDD一次。我的意思是,在创建此RDD之后,我对其进行分区,然后稍后使用该分区的RDD。因此,是的,我只使用它一次。 - pythonic
那么从它继承的任何RDD呢?它们也只使用一次吗?如果是这样,那可能是一个bug(没有看到实际代码无法确定)。 - Alexey Romanov

4
如果您没有使用RDD.cache,那么RDD的计算结果将不会被保留在内存中。例如(有一个名为rdd_test的RDD数据)。
val rdd_test: RDD[Int] = sc.makeRDD(Array(1,2,3), 1)
val a = rdd_test.map(_+1)
val b = a.map(_+1)

现在,ab这三个RDD数据未存储在内存中。因此,在val c = b.map(_+1)时,ab将会重新计算。如果我们对a和b使用缓存:
val rdd_test: RDD[Int] = sc.makeRDD(Array(1,2,3), 1)
val a = rdd_test.map(_+1).cache
val b = a.map(_+1).cache

那么当执行val c = b.map(_+1)时,ab不会重新计算。

需要注意的是:如果内存不足,cache方法将失败,此时ab将会重新计算。

很抱歉,我不太擅长英语。


我只使用我的RDD一次。而在你的第一个例子中,你也只使用了一次,那么为什么需要缓存呢? - pythonic

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接