Spark RDD groupByKey + join vs join performance

2019-09-02 17:36发布

问题:

I am using Spark on the cluster which I am sharing with others users. So it is not reliable to tell which one of my code runs more efficient just based on the running time. Because when I am running the more efficient code, someone else maybe running huge data works and makes my code executes for a longer time.

So can I ask 2 questions here:

  1. I was using join function to join 2 RDDsand I am trying to use groupByKey() before using join, like this:

    rdd1.groupByKey().join(rdd2)
    

    seems that it took longer time, however I remember when I was using Hadoop Hive, the group by made my query ran faster. Since Spark is using lazy evaluation, I am wondering whether groupByKey before join makes things faster

  2. I have noticed Spark has a SQL module, so far I really don't have time to try it, but can I ask what are the differences between the SQL module and RDD SQL like functions?

回答1:

  1. There is no good reason for groupByKey followed by join to be faster than join alone. If rdd1 and rdd2 have no partitioner or partitioners differ then a limiting factor is simply shuffling required for HashPartitioning.

    By using groupByKey you not only increase a total cost by keeping mutable buffers required for grouping but what is more important you use an additional transformation which results in a more complex DAG. groupByKey + join:

    rdd1 = sc.parallelize([("a", 1), ("a", 3), ("b", 2)])
    rdd2 = sc.parallelize([("a", 5), ("c", 6), ("b", 7)])
    rdd1.groupByKey().join(rdd2)
    

    vs. join alone:

    rdd1.join(rdd2)
    

    Finally these two plans are not even equivalent and to get the same results you have to add an additional flatMap to the first one.

  2. This is a quite broad question but to highlight the main differences:

    • PairwiseRDDs are homogeneous collections of arbitraryTuple2 elements. For default operations you want key to be hashable in a meaningful way otherwise there are no strict requirements regarding the type. In contrast DataFrames exhibit much more dynamic typing but each column can only contain values from a supported set of defined types. It is possible to define UDT but it still has to be expressed using basic ones.

    • DataFrames use a Catalyst Optimizer which generates logical and physical execution planss and can generate highly optimized queries without need for applying manual low level optimizations. RDD based operations simply follow dependency DAG. It means worse performance without custom optimization but much better control over execution and some potential for fine graded tuning.

Some other things to read:

  • Difference between DataFrame and RDD in Spark
  • Why spark.ml don't implement any of spark.mllib algorithms?


回答2:

I mostly agree with zero323's answer, but I think there is reason to expect join to be faster after groupByKey. groupByKey reduces the amount of data and partitions the data by the key. Both of these help with the performance of a subsequent join.

I don't think the former (reduced data size) is significant. And to reap the benefits of the latter (partitioning) you need to have the other RDD partitioned the same way.

For example:

val a = sc.parallelize((1 to 10).map(_ -> 100)).groupByKey()
val b = sc.parallelize((1 to 10).map(_ -> 100)).partitionBy(a.partitioner.get)
a.join(b).collect