Apache Spark RDD缓存和Lineage混淆

3
我往RDD中填充一些随机的数学值:
val itemFactors = rddItems.mapValues(newFactors => 
    Vector(Array.fill(2){math.random})
)

然后,我将该RDD与其他RDD连接并缓存:

val finalRDD = itemFactors.join(rddItemsUsers).map{
    case(itemid, (itemVector, ((userid, rating), userVector))) => 
        (itemid, itemVector, userid, userVector, rating)}.cache

然后我对finalRDD中保存的数据进行计算:

sqrt(finalRDD.aggregate(0.0)((accum, item) => 
    accum + pow(item._5 - item._4.dot(item._2), 2), _ + _) / finalRDD.count)

我从控制台多次调用代码的最后一部分,即sqrt(...),但每次都得到不同的结果 - 这是不希望出现的,因为我什么也没有改变!有两种方法可以解决这个问题(即使我得到一致的结果):
  • 不要使用math.random初始化itemFactors,而是使用一个固定数值(例如1.0)填充数组。

  • 使用itemFactors.cache

现在,我明白了由于继承关系,每次调用itemFactors时都会调用math.random并创建一个新的数字 - 这将影响我的计算。因此,使用固定数字填充数组会产生一致的结果。
但是,大问题和我不理解的部分是:我正在缓存finalRDD,这是计算执行的内容,由itemFactors组成,所以节点只被访问一次,那么itemFactor的数组填充什么内容似乎并不重要?我以为我开始理解继承关系了,然而这却让我感到困惑。

1
cache()不能保证它总是被缓存--例如,您是否有足够的内存来保存它?它的缓存百分比是多少?如果丢失一个块,它也会被重新计算。 - Sean Owen
不太确定如何检查,但我检查了 getStorageLevel 并得到了:StorageLevel(false, true, false, true, 1)。如果我执行 getStorageLevel.description,我会得到:Memory Deserialized 1x ReplicatedgetStorageLevel.useMemory 返回 True - monster
你能否提供一个可重现的例子吗?我不知道rddItemsrddItemsUsers里面是什么。可能是小矮人。 - Daniel Darabos
您可以在UI的存储选项卡中检查缓存的分数。 - Daniel Darabos
我会尽快制作一个可重现的示例。感谢您提供的UI提示。 - monster
你解决了吗? :) 我也有同样的疑问。 - SuperAadi
1个回答

3

如果您的缓存无法适应内存,则会根据LRU策略进行丢失。

为了避免这种情况,您可以使用persist函数,并按照所示传递参数。

val result = input.map(x => x*x)
result.persist(MEMORY_ONLY)

MEMORY_ONLY - 将RDD存储为JVM中反序列化的Java对象。如果RDD不适合内存,一些分区将无法缓存,并在每次需要时动态重新计算。这是默认级别。

MEMORY_AND_DISK - 将RDD存储为JVM中反序列化的Java对象。如果RDD不适合内存,则将不适合的分区存储在磁盘上,并在需要时从磁盘读取。

MEMORY_ONLY_SER - 将RDD存储为序列化的Java对象(每个分区一个字节数组)。通常情况下,使用快速序列化器时比反序列化对象更省空间,但读取时会更耗费CPU资源。

MEMORY_AND_DISK_SER - 与MEMORY_ONLY_SER类似,但将不适合内存的分区溢写到磁盘,而不是每次需要时动态重新计算。

DISK_ONLY - 只在磁盘上存储RDD分区。

MEMORY_ONLY_2、MEMORY_AND_DISK_2等 - 与上述级别相同,但在两个集群节点上复制每个分区。

OFF_HEAP(实验性)- 在Tachyon中以序列化格式存储RDD。与MEMORY_ONLY_SER相比,OFF_HEAP减少了垃圾收集开销,并允许执行器更小并共享内存池,因此在具有大堆或多个并发应用程序的环境中非常有吸引力。此外,由于RDD位于Tachyon中,执行器的崩溃不会导致失去内存缓存。在此模式下,Tachyon中的内存是可丢弃的。因此,Tachyon不会尝试重构从内存中驱逐的块。

请参考此链接以获取更多文档


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接