I'm using com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
when run zeppelin notebooks and don't understand the difference between two operations in spark. One operation takes a lot of time for computation, the second one executes immediately. Could someone explain to me the differences between two operations:
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
case class SomeClass(val someField:String)
val timelineItems = spark.read.format("org.apache.spark.sql.cassandra").options(scala.collection.immutable.Map("spark.cassandra.connection.host" -> "127.0.0.1", "table" -> "timeline_items", "keyspace" -> "timeline" )).load()
//some simplified code:
val timelineRow = timelineItems
.map(x => {SomeClass("test")})
.filter(x => x != null)
.toDF()
.limit(4)
//first operation (takes a lot of time. It seems spark iterates through all items in Cassandra and doesn't use laziness with limit 4)
println(timelineRow.count()) //return: 4
//second operation (executes immediately); 300 - just random number which doesn't affect the result
println(timelineRow.take(300).length) //return: 4
What is you see is a difference between implementation of
Limit
(an transformation-like operation) andCollectLimit
(an action-like operation). However the difference in timings is highly misleading, and not something you can expect in general case.First let's create a MCVE
make sure we start with clean slate:
invoke
count
:and take a look at the execution plan (from Spark UI):
The core component is
which indicates that we can expect a wide operation with multiple stages. We can see a single job
with two stages
with eight
and one
task respectively.
Now let's compare it to
which generates following
While both global and local limits still occur, there is no exchange in the middle. Therefore we can expect a single stage operation. Please note that planner narrowed down limit to more restrictive value.
As expected we see a single new job:
which generated only one stage:
with only one task
What does it mean for us?
count
case Spark used wide transformation and actually appliesLocalLimit
on each partition and shuffles partial results to performGlobalLimit
.take
case Spark used narrow transformation and evaluatedLocalLimit
only on the first partition.Obviously the latter approach won't work with number of values in the first partition is lower than the requested limit.
In such case the first
count
will use exactly the same logic as before (I encourage you to confirm that empirically), buttake
will take rather different path. So far we triggered only two jobs:Now if we execute
you'll see that it required 3 more jobs:
So what's going on here? As noted before evaluating a single partition is not enough to satisfy limit in general case. In such case Spark iteratively evaluates
LocalLimit
on partitions, untilGlobalLimit
is satisfied, increasing number of partitions taken in each iteration.Such strategy can have significant performance implications. Starting Spark jobs alone is not cheap and in cases, when upstream object is a result of wide transformation things can get quite ugly (in the best case scenario you can read shuffle files, but if these are lost for some reason, Spark might be forced to re-execute all the dependencies).
To summarize:
take
is an action, and can short circuit in specific cases where upstream process is narrow, andLocalLimits
can be satisfyGlobalLimits
using the first few partitions.limit
is a transformation, and always evaluates allLocalLimits
, as there is no iterative escape hatch.While one can behave better than the other in specific cases, there not exchangeable and neither guarantees better performance in general.