(Scala-specific question.)
While Spark docs encourage the use of DataFrame API where possible, if DataFrame API is insufficient, the choice is usually between falling back to RDD API or using UDFs. Is there inherent performance difference between these two alternatives?
RDD and UDF are similar in that neither of them can benefit from Catalyst and Tungsten optimizations. Is there any other overhead, and if there is, does it differ between the two approaches?
To give a specific example, let's say I have a DataFrame that contains a column of text data with custom formatting (not amenable to regexp matching). I need to parse that column and add a new vector column that contains the resulting tokens.
neither of them can benefit from Catalyst and Tungsten optimizations
This is not exactly true. While UDFs don't benefit from Tungsten optimization (arguably simple SQL transformation don't get huge boost there either) you still may benefit from execution plan optimizations provided by Catalyst. Let's illustrate that with a simple example (Note: Spark 2.0 and Scala. Don't extrapolate this to earlier versions, especially with PySpark):
val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)
val df = Seq(("a", 1), ("b", 2)).toDF
df
.groupBy($"_1")
.agg(sum($"_2").as("_2"))
.where(f($"_1"))
.withColumn("_2", g($"_2"))
.select($"_1")
.explain
// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
// +- *HashAggregate(keys=[_1#2], functions=[])
// +- *Project [_1#2]
// +- *Filter UDF(_1#2)
// +- LocalTableScan [_1#2, _2#3]
Execution plan shows us a couple of things:
- Selection has been pushed down before aggregation.
- Projection has been pushed down before aggregation and effectively removed second UDF call.
Depending on the data and pipeline this can provide a substantial performance boost almost for free.
That being said both RDDs and UDFs require migrations between safe and unsafe with the latter one being significantly less flexible. Still, if the only thing you need is a simple map
-like behavior without initializing expensive objects (like database connections) then UDF is the way to go.
In slightly more complex scenarios you can easily drop down to generic Dataset
and reserve RDDs
for cases when you really require an access to some low level features like custom partitioning.