Does cache() in spark change the state of the RDD

2019-06-28 03:33发布

问题:

This question is a follow up to a previous question I had What happens if I cache the same RDD twice in Spark.

When calling cache() on a RDD, does the state of the RDD changed (and the returned RDD is just this for ease of use) or a new RDD is created the wrapped the existing one?

What will happen in the following code:

// Init
JavaRDD<String> a = ... // some initialise and calculation functions.
JavaRDD<String> b = a.cache();
JavaRDD<String> c = b.cache();

// Case 1, will 'a' be calculated twice in this case 
// because it's before the cache layer:
a.saveAsTextFile(somePath);
a.saveAsTextFile(somePath);

// Case 2, will the data of the calculation of 'a' 
// be cached in the memory twice in this case
// (once as 'b' and once as 'c'):
c.saveAsTextFile(somePath);

回答1:

When calling cache() on a RDD, does the state of the RDD changed (and the returned RDD is just this for ease of use) or a new RDD is created the wrapped the existing one

The same RDD is returned:

/**
 * Mark this RDD for persisting using the specified level.
 *
 * @param newLevel the target storage level
 * @param allowOverride whether to override any existing level with the new one
 */
  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
}
  // If this is the first time this RDD is marked for persisting, register it
  // with the SparkContext for cleanups and accounting. Do this only once.
  if (storageLevel == StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
  storageLevel = newLevel
  this
}

Caching doesn't cause any side effect to the said RDD. If it's already marked for persistence, nothing will happen. If it isn't, the only side effect would be registering it to the SparkContext, where the side effect isn't on the RDD itself, but the context.

Edit:

Looking at JavaRDD.cache, it seems that the underlying call will cause the allocation of another JavaRDD:

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())

Where wrapRDD calls JavaRDD.fromRDD:

object JavaRDD {

  implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
  implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}

Which will cause the allocation of a new JavaRDD. That said, the internal instance of RDD[T] will remain the same.



回答2:

Caching doesn't change the state of an RDD.

When a transformation occurs, caching computes and materializes an RDD in memory while keeping track of its lineage (dependencies). There are many levels of persistence.

Since caching remembers an RDD’s lineage, Spark can recompute loss partitions in the event of node failures. Lastly, an RDD that is cached lives within the context of the running application, and once the application terminates, cached RDDs are deleted as well.