I found out that when using .map( identity ).cache
on a rdd, it become very slow if the items are big. While it is pretty much instantaneous otherwise.
Note: this is probably related to this question, but here I provide a very precise example (that can be executed directly in spark-shell):
// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
val t = System.nanoTime
val out = code
println(s"time = ${(System.nanoTime - t)/1000000}ms")
out
}
// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )
// create rdd
val n = 1000 // size of the rdd
val rdd = sc.parallelize(1 to n).map( k => bigContent() ).cache
rdd.count // to trigger caching
// profiling
profile( rdd.count ) // around 12 ms
profile( rdd.map(identity).count ) // same
profile( rdd.cache.count ) // same
profile( rdd.map(identity).cache.count ) // 5700 ms !!!
I first expected that it was the time to create a new rdd (container). But if I use a rdd with same size but little content, there is only a tiny difference in execution time:
val rdd = parallelize(1 to n).cache
rdd.count
profile( rdd.count ) // around 9 ms
profile( rdd.map(identity).count ) // same
profile( rdd.cache.count ) // same
profile( rdd.map(identity).cache.count ) // 15 ms
So, it looks like caching is actually copying the data. I thought it might also lose time serializing it, but I checked that cache is used with default MEMORY_ONLY persistence:
rdd.getStorageLevel == StorageLevel.MEMORY_ONLY // true
=> So, is caching copying data, or is it something else?
This is really a major limitation for my application because I started with a design that use something similar to rdd = rdd.map(f: Item => Item).cache
that can be used with many such functions f applied in arbitrary order (order that I cannot determine before hand).
I am using Spark 1.6.0
Edit
When I look at the spark ui -> stage tab -> the last stage (i.e. 4), all tasks have pretty much the same data with:
- duration = 3s (it went down to 3s, but that's still 2.9 too much :-\ )
- scheduler 10ms
- task deserialization 20ms
- gc 0.1s (all tasks have that, but why would gc be triggered???)
- result serialization 0ms
- getting result 0ms
- peak exec mem 0.0B
- input size 7.0MB/125
- no errors
A jstack
of the process running the org.apache.spark.executor.CoarseGrainedExecutorBackend
during the slow caching reveals the following:
"Executor task launch worker-4" #76 daemon prio=5 os_prio=0 tid=0x00000000030a4800 nid=0xdfb runnable [0x00007fa5f28dd000]
java.lang.Thread.State: RUNNABLE
at java.util.IdentityHashMap.resize(IdentityHashMap.java:481)
at java.util.IdentityHashMap.put(IdentityHashMap.java:440)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:251)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:284)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:276)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:260)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"Executor task launch worker-5" #77 daemon prio=5 os_prio=0 tid=0x00007fa6218a9800 nid=0xdfc runnable [0x00007fa5f34e7000]
java.lang.Thread.State: RUNNABLE
at java.util.IdentityHashMap.put(IdentityHashMap.java:428)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:223)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:223)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
The SizeEstimator makes sense as one of the main costs of caching something which is ostensibly already in memory, since proper size estimation for unknown objects can be fairly difficult; if you look in the visitSingleObject method, you can see it heavily relies on reflection, calling getClassInfo
which accesses runtime type information; not only does the full object hierarchy get traversed, but each nested member gets checked against an IdentityHashMap
to detect which references refer to the same concrete object instance, and thus the stack traces show lots of time in those IdentityHashMap operations.
In the case of your example objects, you basically have each item as a list of maps from wrapped integers to wrapped integers; presumably Scala's implementation of the inner map holds an array as well, which explains the visitSingleObject -> List.foreach -> visitSingleObject -> visitSingleObject call hierarchy. In any case, there are lots of inner objects to visit in this case, and the SizeEstimators set up a fresh IdentityHashMap for each object sampled.
In the case where you measure:
profile( rdd.cache.count )
this doesn't count as exercising the caching logic since the RDD has already been successfully cached, so Spark is smart enough not to re-run the caching logic. You can actually isolate out the exact cost of the caching logic independently of the extra "map(identity)" transformation by profiling your fresh RDD creation and caching directly; here's my Spark session continuing from your last few lines:
scala> profile( rdd.count )
time = 91ms
res1: Long = 1000
scala> profile( rdd.map(identity).count )
time = 112ms
res2: Long = 1000
scala> profile( rdd.cache.count )
time = 59ms
res3: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 6564ms
res4: Long = 1000
scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).count )
time = 14990ms
res5: Long = 1000
scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).cache.count )
time = 22229ms
res6: Long = 1000
scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).map(identity).cache.count )
time = 21922ms
res7: Long = 1000
So you can see, the slowness didn't come from the fact that you ran through a map
transformation, per se, but rather in this case the ~6s appears to be the fundamental cost of calculating caching logic for 1000 objects when each object has something like ~1,000,000 to ~10,000,000 inner objects (depending on how the Map implementation is layed out; ex extra visitArray
nesting in the top stack trace hints that the HashMap impl has nested arrays, which makes sense for a typical dense linear-probing data structure inside each hashtable entry).
For your concrete use case, you should err on the side of lazy caching if possible, since there's overhead associated with caching intermediate results that's not a good tradeoff if you're not really going to reuse the intermediate results for lots of separate downstream transformations. But as you mention in your question, if you're indeed using one RDD to branch out into multiple different downstream transformations, you might indeed need the caching step if the original transformations are at all expensive.
The workaround is to try to have inner data structures which are more amenable to constant-time calculations (e.g. arrays of primitives), where you can save a lot of cost on avoiding iterating over huge numbers of wrapper objects and depending on reflection for them in the SizeEstimator.
I tried things like Array[Array[Int]] and even though there's still nonzero overhead, it's 10x better for a similar data size:
scala> def bigContent2() = (1 to 1000).map( i => (1 to 1000).toArray ).toArray
bigContent2: ()Array[Array[Int]]
scala> val rdd = sc.parallelize(1 to n).map( k => bigContent2() ).cache
rdd: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[23] at map at <console>:28
scala> rdd.count // to trigger caching
res16: Long = 1000
scala>
scala> // profiling
scala> profile( rdd.count )
time = 29ms
res17: Long = 1000
scala> profile( rdd.map(identity).count )
time = 42ms
res18: Long = 1000
scala> profile( rdd.cache.count )
time = 34ms
res19: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 763ms
res20: Long = 1000
To illustrate just how bad the cost of reflection on any fancier objects is, if I remove the last toArray
there and end up with each bigContent
being a scala.collection.immutable.IndexedSeq[Array[Int]]
, the performance goes back to being within ~2x the slowness of the original IndexSeq[Map[Int,Int]]
case:
scala> def bigContent3() = (1 to 1000).map( i => (1 to 1000).toArray )
bigContent3: ()scala.collection.immutable.IndexedSeq[Array[Int]]
scala> val rdd = sc.parallelize(1 to n).map( k => bigContent3() ).cache
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.IndexedSeq[Array[Int]]] = MapPartitionsRDD[27] at map at <console>:28
scala> rdd.count // to trigger caching
res21: Long = 1000
scala>
scala> // profiling
scala> profile( rdd.count )
time = 27ms
res22: Long = 1000
scala> profile( rdd.map(identity).count )
time = 39ms
res23: Long = 1000
scala> profile( rdd.cache.count )
time = 37ms
res24: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 2781ms
res25: Long = 1000
As discussed in the comment section, you can also consider using the MEMORY_ONLY_SER StorageLevel, where as long as there's an efficient serializer, it can quite possibly be cheaper than the recursive reflection used in SizeEstimator; to do that you'd just replace cache()
with persist(StorageLevel.MEMORY_ONLY_SER)
; as mentioned in this other question, cache()
is conceptually the same thing as persist(StorageLevel.MEMORY_ONLY)
.
import org.apache.spark.storage.StorageLevel
profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
I actually tried this on both Spark 1.6.1 and Spark 2.0.0-preview running with everything else about the cluster configuration exactly the same (using Google Cloud Dataproc's "1.0" and "preview" image-versions, respectively). Unfortunately the MEMORY_ONLY_SER trick didn't appear to help in Spark 1.6.1:
scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 6709ms
res19: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 6126ms
res20: Long = 1000
scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 6214ms
res21: Long = 1000
But in Spark 2.0.0-preview it seemed to improve performance by 10x:
scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 500ms
res18: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 5353ms
res19: Long = 1000
scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 5927ms
res20: Long = 1000
This could vary depending on your objects though; speedup would only be expected if serialization itself isn't using tons of reflection anyway; if you're able to effectively use the Kryo serialization then it's likely you can see improvement using MEMORY_ONLY_SER
for these large objects.