在Spark中,cache()方法会改变RDD的状态还是创建一个新的RDD?

4
这个问题是我之前提出的一个问题(如果我在Spark中两次缓存相同的RDD会发生什么)的跟进。
当对RDD调用cache()时,RDD的状态是否会改变(并且返回的RDD只是为了方便使用而是指向自身),或者会创建一个新的RDD来包装现有的RDD?
下面的代码将会发生什么?
// Init
JavaRDD<String> a = ... // some initialise and calculation functions.
JavaRDD<String> b = a.cache();
JavaRDD<String> c = b.cache();

// Case 1, will 'a' be calculated twice in this case 
// because it's before the cache layer:
a.saveAsTextFile(somePath);
a.saveAsTextFile(somePath);

// Case 2, will the data of the calculation of 'a' 
// be cached in the memory twice in this case
// (once as 'b' and once as 'c'):
c.saveAsTextFile(somePath);
2个回答

6
当在RDD上调用cache()方法时,RDD的状态是否发生了改变(并且返回的RDD只是为方便而存在),还是创建了一个新的封装了现有RDD的RDD。 相同的RDD被返回
/**
 * Mark this RDD for persisting using the specified level.
 *
 * @param newLevel the target storage level
 * @param allowOverride whether to override any existing level with the new one
 */
  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
}
  // If this is the first time this RDD is marked for persisting, register it
  // with the SparkContext for cleanups and accounting. Do this only once.
  if (storageLevel == StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
  storageLevel = newLevel
  this
}

缓存不会对所述RDD产生任何副作用。如果它已经被标记为持久性,那么什么也不会发生。如果没有,唯一的副作用将是将其注册到SparkContext,其中副作用不在于RDD本身,而是上下文。
编辑:
查看 JavaRDD.cache,似乎底层调用将导致另一个JavaRDD的分配:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())

wrapRDD 调用 JavaRDD.fromRDD 时:

object JavaRDD {

  implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
  implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}

这将导致分配一个新的JavaRDD。尽管如此,RDD[T]的内部实例将保持不变。


1

缓存不会改变RDD的状态。

当发生转换时,缓存计算并将一个RDD材料化到内存中,同时跟踪其血统(依赖关系)。有多个级别的持久性。

由于缓存记住了RDD的血统,在节点故障的情况下,Spark可以重新计算丢失的分区。最后,被缓存的RDD存在于运行应用程序的上下文中,一旦应用程序终止,缓存的RDD也会被删除。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接