在Spark Streaming中,我是否必须在cache()或persist()之后调用count()才能强制缓存/持久化真正发生?

5

观看这个关于Spark内部的非常好的视频,演讲者说,除非在缓存RDD后执行操作,否则缓存不会真正发生。

在其他情况下,我从未看到count()被调用。因此,我猜他只在cache()之后调用count()来强制持久化在他所给出的简单示例中。在代码中每次调用cache()或persist()都不需要这样做。这是正确的吗?

2个回答

6
除非在缓存RDD之后执行某个操作,否则缓存实际上不会发生。这是100%正确的。方法cache / persist只会标记RDD进行缓存。当对RDD调用操作时,缓存RDD中的项。您再次100%正确,但我会详细说明一下。为了便于理解,请考虑以下示例。
rdd.cache()
rdd.map(...).flatMap(...) //and so on
rdd.count() //or any other action

假设您的RDD中有10个文档。运行以上代码段时,每个文档都会经历以下任务:
  • 缓存
  • map函数
  • flatMap函数
另一方面,
rdd.cache().count()  
rdd.map(...).flatMap(...)  //and so on
rdd.count()  //or any other action

当上述代码段被运行时,首先缓存了所有10个文档(整个RDD)。然后应用map函数和flatMap函数。
两种方法都是正确的,根据需求使用。希望这能让事情更清晰明了。

如果我使用Scala的par并行处理,将RDD数据复制到3个不同的服务器上,那么在这种情况下哪种方法更好呢?代码如下:serverIPlist.par.map(copyDataToSolrFunction)。在这种情况下,我应该使用方法1还是方法2? - Aman Tandon

2
.cache().persist()都是转换操作(而不是动作),因此当您调用它们时,会将它们添加到DAG中。如下图所示,缓存/持久化的rdd/dataframe在点上呈绿色。

enter image description here

当您在许多转换后有一个动作(.count().save().show()等)时,立即执行另一个动作并不重要。 根据@code的示例:

// 1 CASE: cache/persist the initial rdd
rdd.cache()
rdd.count() // It forces the cache but it DOESNT need because we have the 2nd count.
rdd.map(...).flatMap(...) # Transformations
rdd.count()  //or any other action

// 2 CASE: cache/persist the transformed rdd
rdd.map(...).flatMap(...) # Transformations
rdd.cache()
rdd.count()  //or any other action

我的观点是,如果您不需要操作的结果,请勿强制进行缓存/持久化,因为这样会计算出一些无用的东西。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接