Performance impact of RDD API vs UDFs mixed with D

2019-02-07 13:39发布

问题:

(Scala-specific question.)

While Spark docs encourage the use of DataFrame API where possible, if DataFrame API is insufficient, the choice is usually between falling back to RDD API or using UDFs. Is there inherent performance difference between these two alternatives?

RDD and UDF are similar in that neither of them can benefit from Catalyst and Tungsten optimizations. Is there any other overhead, and if there is, does it differ between the two approaches?

To give a specific example, let's say I have a DataFrame that contains a column of text data with custom formatting (not amenable to regexp matching). I need to parse that column and add a new vector column that contains the resulting tokens.

回答1:

neither of them can benefit from Catalyst and Tungsten optimizations

This is not exactly true. While UDFs don't benefit from Tungsten optimization (arguably simple SQL transformation don't get huge boost there either) you still may benefit from execution plan optimizations provided by Catalyst. Let's illustrate that with a simple example (Note: Spark 2.0 and Scala. Don't extrapolate this to earlier versions, especially with PySpark):

val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)

val df = Seq(("a", 1), ("b", 2)).toDF

df
  .groupBy($"_1")
  .agg(sum($"_2").as("_2"))
  .where(f($"_1"))
  .withColumn("_2", g($"_2"))
  .select($"_1")
  .explain

// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
//    +- *HashAggregate(keys=[_1#2], functions=[])
//       +- *Project [_1#2]
//          +- *Filter UDF(_1#2)
//             +- LocalTableScan [_1#2, _2#3]

Execution plan shows us a couple of things:

  • Selection has been pushed down before aggregation.
  • Projection has been pushed down before aggregation and effectively removed second UDF call.

Depending on the data and pipeline this can provide a substantial performance boost almost for free.

That being said both RDDs and UDFs require migrations between safe and unsafe with the latter one being significantly less flexible. Still, if the only thing you need is a simple map-like behavior without initializing expensive objects (like database connections) then UDF is the way to go.

In slightly more complex scenarios you can easily drop down to generic Dataset and reserve RDDs for cases when you really require an access to some low level features like custom partitioning.