Spark RDD's are constructed in immutable, fault tolerant and resilient manner.
Does RDDs satisfy immutability in all scenarios? Or is there any case, be it in Streaming or Core, where RDD might fail to satisfy immutability?
Spark RDD's are constructed in immutable, fault tolerant and resilient manner.
Does RDDs satisfy immutability in all scenarios? Or is there any case, be it in Streaming or Core, where RDD might fail to satisfy immutability?
It depends on what you mean when you talk about RDD
. Strictly speaking RDD
is just a description of lineage which exists only on the driver and it doesn't provide any methods which can be used to mutate its lineage.
When data is processed we can no longer talk about about RDDs but tasks nevertheless data is exposed using immutable data structures (scala.collection.Iterator
in Scala, itertools.chain
in Python).
So far so good. Unfortunately immutability of a data structure doesn't imply immutability of the stored data. Lets create a small example to illustrate that:
val rdd = sc.parallelize(Array(0) :: Array(0) :: Array(0) :: Nil)
rdd.map(a => { a(0) +=1; a.head }).sum
// Double = 3.0
You can execute this as many times as you want and get the same result. Now lets cache
rdd
and repeat a whole process:
rdd.cache
rdd.map(a => { a(0) +=1; a.head }).sum
// Double = 3.0
rdd.map(a => { a(0) +=1; a.head }).sum
// Double = 6.0
rdd.map(a => { a(0) +=1; a.head }).sum
// Double = 9.0
Since function we use in the first map
is not pure and modifies its mutable argument in place these changes are accumulated with each execution and result in unpredictable output. For example if rdd
is evicted from cache we can once again get 3.0. If some partitions are not cached you can mixed results.
PySpark provides stronger isolation and obtaining result like this is not possible but it is a matter of architecture not a immutability.
Take away message here is that you should be extremely careful when working with mutable data and avoid any modifications in place unless it is explicitly allowed (fold
, aggregate
).
Take this example:
sc.makeRDD(1 to 100000).map(x=>{
println(x)
x + 1
}.collect
If a node fails after the map
has been completed, but the full results have yet to be sent back to the driver, then the map
will recompute on a different machine. The final results will always be the same, as any value double computed will only be sent back once. However, the println
will have occurred twice for some calls. So, yes, immutability of the DAG itself is guaranteed, but you must still write your code with the assumption that it will be run more than once.