Assuming that I would like to write a function foo that transforms a DataFrame:
object Foo {
def foo(source: DataFrame): DataFrame = {
...complex iterative algorithm with a stopping condition...
}
}
since the implementation of foo has many "Actions" (collect, reduce etc.), calling foo will immediately triggers the expensive execution.
This is not a big problem, however since foo only converts a DataFrame to another, by convention it should be better to allow lazy execution: the implementation of foo should be executed only if the resulted DataFrame or its derivative(s) are being used on the Driver (through another "Action").
So far, the only way to reliably achieve this is through writing all implementations into a SparkPlan, and superimpose it into the DataFrame's SparkExecution, this is very error-prone and involves lots of boilerplate codes. What is the recommended way to do this?
It is not exactly clear to me what you try to achieve but Scala itself provides at least few tools which you may find useful:
lazy vals:
val rdd = sc.range(0, 10000)
lazy val count = rdd.count // Nothing is executed here
// count: Long = <lazy>
count // count is evaluated only when it is actually used
// Long = 10000
call-by-name (denoted by =>
in the function definition):
def foo(first: => Long, second: => Long, takeFirst: Boolean): Long =
if (takeFirst) first else second
val rdd1 = sc.range(0, 10000)
val rdd2 = sc.range(0, 10000)
foo(
{ println("first"); rdd1.count },
{ println("second"); rdd2.count },
true // Only first will be evaluated
)
// first
// Long = 10000
Note: In practice you should create local lazy binding to make sure that arguments are not evaluated on every access.
infinite lazy collections like Stream
import org.apache.spark.mllib.random.RandomRDDs._
val initial = normalRDD(sc, 1000000L, 10)
// Infinite stream of RDDs and actions and nothing blows :)
val stream: Stream[RDD[Double]] = Stream(initial).append(
stream.map {
case rdd if !rdd.isEmpty =>
val mu = rdd.mean
rdd.filter(_ > mu)
case _ => sc.emptyRDD[Double]
}
)
Some subset of these should be more than enough to implement complex lazy computations.