如何正确使用cache()函数?

3

我正在使用Spark 1.1.0,并尝试将图形加载到GraphX中。 我的代码的一部分如下:

val distinct = context.union(r1, r2).distinct;
distinct.cache()

val zipped = distinct.zipWithUniqueId
zipped.cache
distinct.unpersist(false)

当我在集群上执行时,首先执行的是:

distinct at Test.scala:72

但是,当这个操作完成后,我在Spark UI的“存储”选项卡中看不到任何条目。下一步是:

zipWithUniqueId at Test.scala:78

但是紧接着它又开始了下面的内容:
distinct at Test.scala:72

这个结果应该被缓存吗?如果一个RDD只使用一次,那么缓存它是否有用?

编辑:

我忘记提到我还在Test.scala:78处获得了一个抓取失败(Fetch failure)

解决Fetch问题的可能方案

可能的解决方案在这里描述,这可能是Spark版本1.1.0的一个错误。

来自spark-user邮件列表的Andrew Ash提出的其他可能解决方案:

目前似乎有3个问题会引起FetchFailures 1.1:

1)执行器上的长GC(超过spark.core.connection.ack.wait.timeout默认值60秒)

2)打开太多文件(达到内核限制ulimit-n)

3)正在跟踪该票证上的某些未确定问题

来源

1个回答

4

cache将在第一次评估RDD时应用。这意味着,为了有效,cache应该先于某个生成RDD的动作,您将多次使用该RDD。

考虑到cache在RDD评估时应用,如果您具有仅执行一次的线性RDD谱系,则缓存将仅占用内存而不提供任何优势。

因此,如果您的流水线是:

val distinct = context.union(r1, r2).distinct;
val zipped = distinct.zipWithUniqueId
zipped.cache

如果你不需要在后续操作中再次访问distinct数据,则在distinctzipped之间使用cache是没有用处的。考虑到你立即进行了unpersisting,我认为这并非如此。

简而言之,只有当评估的RDD将被多次使用时才使用.cache。(例如,迭代算法,查找等)

缓存spark-shell示例:

val rdd = sc.makeRDD( 1 to 1000)
val cached = rdd.cache // at this point, nothing in the console

SparkUI persistence tab: no persisted RDDs

cached.count // at this point, you can see cached in the console
res0: Long = 1000

SparkUI persistence tab: cached RDD is available

val zipped = cached.zipWithUniqueId
val zipcache = zipped.cache // again nothing new on the UI
val zipcache.first // first is an action and will trigger RDD evaluation

SparkUI showing the 2nd cached rdd

cached.unpersist(blocking=true) // force immediate unpersist

SparkUI not showing unpersisted RDD anymore


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接