An RDD has a lineage and therefore does not exist until an action if performed on it; so, if I have a method which performs numerous transformations on the RDD and returns a transformed RDD then what am I actually returning?
Am I returning nothing until that RDD is required for an action? If I cached an RDD in the method, does it persist in the cache? I think I know the answer to this being: the method will only be run when the action is called on the RDD which is returned? But I could be wrong.
An extension to this question is:
If I have a tail recursive method that takes an RDD as a parameter and returns an RDD but I am caching RDD's within the method:
def method(myRDD : RDD) : RDD = {
...
anRDD.cache
if(true) return someRDD
method(someRDD) // tailrec
}
Then, when a tail recursion happens, does it overwrite the previous cached RDD anRDD
or do both persist? I'd imagine both persist. I am having data spilled to disk when the dataset I'm using is just 63mb big. And I think it could have something to do with the tail recursive method.
The RDD lineage is built as a graph of RDD object instances linked together where every node in the lineage has a reference to its dependencies. In it's most simple chain form, you could see it as a linked list:
hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)
You can appreciate this in base RDD constructor:
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
To come to the point: In the same way we can recursively build a linked list, we can also build an RDD lineage. The result of the recursive function that acts on RDDs will be a well-defined RDD.
An action will be required to schedule that lineage for execution, and will materialize the computation represented by it, much like one could "walk" a linked list once it has been created.
Consider this (rather contrieved, I must admit) example:
def isPrime(n:Int):Boolean = {
(n == 2) || (!( n % 2 ==0) && !((3 to math.sqrt(n).ceil.toInt) exists (x => n % x == 0)))
}
def recPrimeFilter(rdd:RDD[Int], i:Int):RDD[Int] =
if (i<=1) rdd else if (isPrime(i)) recPrimeFilter(rdd.filter(x=> x!=i), i-1) else (recPrimeFilter(rdd.map(x=>x+i), i-1))
When applied to an RDD of ints, we can observe the lineage with the interleaved filter and map resulting of the prime number locations :
val rdd = sc.parallelize(1 to 100)
val res = weirdPrimeFilter(rdd,15)
scala> res.toDebugString
res3: String =
(8) FilteredRDD[54] at filter at <console>:18 []
| FilteredRDD[53] at filter at <console>:18 []
| MappedRDD[52] at map at <console>:18 []
| FilteredRDD[51] at filter at <console>:18 []
| MappedRDD[50] at map at <console>:18 []
| FilteredRDD[49] at filter at <console>:18 []
| MappedRDD[48] at map at <console>:18 []
| MappedRDD[47] at map at <console>:18 []
| MappedRDD[46] at map at <console>:18 []
| FilteredRDD[45] at filter at <console>:18 []
| MappedRDD[44] at map at <console>:18 []
| FilteredRDD[43] at filter at <console>:18 []
| MappedRDD[42] at map at <console>:18 []
| MappedRDD[41] at map at <console>:18 []
| ParallelCollectionRDD[33] at parallelize at <console>:13 []
'cache' breaks the lineage, making the RDD at the point of caching to "remember" its contents the first time it passes by there, sothat all dependent RDDs further up in the lineage can reuse that cached data.
In the basic case of the linear RDD lineage, it will have no effect at all, because each node will be visited only once.
Caching, in this case, could make sense if the recursive RDD construction process creates a graph or tree-like structure where actions are called at many different 'leaf' nodes.