I am using Spark on the cluster which I am sharing with others users. So it is not reliable to tell which one of my code runs more efficient just based on the running time. Because when I am running the more efficient code, someone else maybe running huge data works and makes my code executes for a longer time.
So can I ask 2 questions here:
I was using join
function to join 2 RDDs
and I am trying to use groupByKey()
before using join
, like this:
rdd1.groupByKey().join(rdd2)
seems that it took longer time, however I remember when I was using Hadoop Hive, the group by made my query ran faster. Since Spark is using lazy evaluation, I am wondering whether groupByKey
before join
makes things faster
I have noticed Spark has a SQL module, so far I really don't have time to try it, but can I ask what are the differences between the SQL module and RDD SQL like functions?
There is no good reason for groupByKey
followed by join
to be faster than join
alone. If rdd1
and rdd2
have no partitioner or partitioners differ then a limiting factor is simply shuffling required for HashPartitioning
.
By using groupByKey
you not only increase a total cost by keeping mutable buffers required for grouping but what is more important you use an additional transformation which results in a more complex DAG. groupByKey
+ join
:
rdd1 = sc.parallelize([("a", 1), ("a", 3), ("b", 2)])
rdd2 = sc.parallelize([("a", 5), ("c", 6), ("b", 7)])
rdd1.groupByKey().join(rdd2)
vs. join
alone:
rdd1.join(rdd2)
Finally these two plans are not even equivalent and to get the same results you have to add an additional flatMap
to the first one.
This is a quite broad question but to highlight the main differences:
PairwiseRDDs
are homogeneous collections of arbitraryTuple2
elements. For default operations you want key to be hashable in a meaningful way otherwise there are no strict requirements regarding the type. In contrast DataFrames exhibit much more dynamic typing but each column can only contain values from a supported set of defined types. It is possible to define UDT but it still has to be expressed using basic ones.
DataFrames use a Catalyst Optimizer which generates logical and physical execution planss and can generate highly optimized queries without need for applying manual low level optimizations. RDD based operations simply follow dependency DAG. It means worse performance without custom optimization but much better control over execution and some potential for fine graded tuning.
Some other things to read:
- Difference between DataFrame and RDD in Spark
- Why spark.ml don't implement any of spark.mllib algorithms?
I mostly agree with zero323's answer, but I think there is reason to expect join
to be faster after groupByKey
. groupByKey
reduces the amount of data and partitions the data by the key. Both of these help with the performance of a subsequent join
.
I don't think the former (reduced data size) is significant. And to reap the benefits of the latter (partitioning) you need to have the other RDD partitioned the same way.
For example:
val a = sc.parallelize((1 to 10).map(_ -> 100)).groupByKey()
val b = sc.parallelize((1 to 10).map(_ -> 100)).partitionBy(a.partitioner.get)
a.join(b).collect