I found out that when using .map( identity ).cache
on a rdd, it become very slow if the items are big. While it is pretty much instantaneous otherwise.
Note: this is probably related to this question, but here I provide a very precise example (that can be executed directly in spark-shell):
// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
val t = System.nanoTime
val out = code
println(s"time = ${(System.nanoTime - t)/1000000}ms")
out
}
// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )
// create rdd
val n = 1000 // size of the rdd
val rdd = sc.parallelize(1 to n).map( k => bigContent() ).cache
rdd.count // to trigger caching
// profiling
profile( rdd.count ) // around 12 ms
profile( rdd.map(identity).count ) // same
profile( rdd.cache.count ) // same
profile( rdd.map(identity).cache.count ) // 5700 ms !!!
I first expected that it was the time to create a new rdd (container). But if I use a rdd with same size but little content, there is only a tiny difference in execution time:
val rdd = parallelize(1 to n).cache
rdd.count
profile( rdd.count ) // around 9 ms
profile( rdd.map(identity).count ) // same
profile( rdd.cache.count ) // same
profile( rdd.map(identity).cache.count ) // 15 ms
So, it looks like caching is actually copying the data. I thought it might also lose time serializing it, but I checked that cache is used with default MEMORY_ONLY persistence:
rdd.getStorageLevel == StorageLevel.MEMORY_ONLY // true
=> So, is caching copying data, or is it something else?
This is really a major limitation for my application because I started with a design that use something similar to rdd = rdd.map(f: Item => Item).cache
that can be used with many such functions f applied in arbitrary order (order that I cannot determine before hand).
I am using Spark 1.6.0
Edit
When I look at the spark ui -> stage tab -> the last stage (i.e. 4), all tasks have pretty much the same data with:
- duration = 3s (it went down to 3s, but that's still 2.9 too much :-\ )
- scheduler 10ms
- task deserialization 20ms
- gc 0.1s (all tasks have that, but why would gc be triggered???)
- result serialization 0ms
- getting result 0ms
- peak exec mem 0.0B
- input size 7.0MB/125
- no errors
A
jstack
of the process running theorg.apache.spark.executor.CoarseGrainedExecutorBackend
during the slow caching reveals the following:The SizeEstimator makes sense as one of the main costs of caching something which is ostensibly already in memory, since proper size estimation for unknown objects can be fairly difficult; if you look in the visitSingleObject method, you can see it heavily relies on reflection, calling
getClassInfo
which accesses runtime type information; not only does the full object hierarchy get traversed, but each nested member gets checked against anIdentityHashMap
to detect which references refer to the same concrete object instance, and thus the stack traces show lots of time in those IdentityHashMap operations.In the case of your example objects, you basically have each item as a list of maps from wrapped integers to wrapped integers; presumably Scala's implementation of the inner map holds an array as well, which explains the visitSingleObject -> List.foreach -> visitSingleObject -> visitSingleObject call hierarchy. In any case, there are lots of inner objects to visit in this case, and the SizeEstimators set up a fresh IdentityHashMap for each object sampled.
In the case where you measure:
this doesn't count as exercising the caching logic since the RDD has already been successfully cached, so Spark is smart enough not to re-run the caching logic. You can actually isolate out the exact cost of the caching logic independently of the extra "map(identity)" transformation by profiling your fresh RDD creation and caching directly; here's my Spark session continuing from your last few lines:
So you can see, the slowness didn't come from the fact that you ran through a
map
transformation, per se, but rather in this case the ~6s appears to be the fundamental cost of calculating caching logic for 1000 objects when each object has something like ~1,000,000 to ~10,000,000 inner objects (depending on how the Map implementation is layed out; ex extravisitArray
nesting in the top stack trace hints that the HashMap impl has nested arrays, which makes sense for a typical dense linear-probing data structure inside each hashtable entry).For your concrete use case, you should err on the side of lazy caching if possible, since there's overhead associated with caching intermediate results that's not a good tradeoff if you're not really going to reuse the intermediate results for lots of separate downstream transformations. But as you mention in your question, if you're indeed using one RDD to branch out into multiple different downstream transformations, you might indeed need the caching step if the original transformations are at all expensive.
The workaround is to try to have inner data structures which are more amenable to constant-time calculations (e.g. arrays of primitives), where you can save a lot of cost on avoiding iterating over huge numbers of wrapper objects and depending on reflection for them in the SizeEstimator.
I tried things like Array[Array[Int]] and even though there's still nonzero overhead, it's 10x better for a similar data size:
To illustrate just how bad the cost of reflection on any fancier objects is, if I remove the last
toArray
there and end up with eachbigContent
being ascala.collection.immutable.IndexedSeq[Array[Int]]
, the performance goes back to being within ~2x the slowness of the originalIndexSeq[Map[Int,Int]]
case:As discussed in the comment section, you can also consider using the MEMORY_ONLY_SER StorageLevel, where as long as there's an efficient serializer, it can quite possibly be cheaper than the recursive reflection used in SizeEstimator; to do that you'd just replace
cache()
withpersist(StorageLevel.MEMORY_ONLY_SER)
; as mentioned in this other question,cache()
is conceptually the same thing aspersist(StorageLevel.MEMORY_ONLY)
.I actually tried this on both Spark 1.6.1 and Spark 2.0.0-preview running with everything else about the cluster configuration exactly the same (using Google Cloud Dataproc's "1.0" and "preview" image-versions, respectively). Unfortunately the MEMORY_ONLY_SER trick didn't appear to help in Spark 1.6.1:
But in Spark 2.0.0-preview it seemed to improve performance by 10x:
This could vary depending on your objects though; speedup would only be expected if serialization itself isn't using tons of reflection anyway; if you're able to effectively use the Kryo serialization then it's likely you can see improvement using
MEMORY_ONLY_SER
for these large objects.