In Apache Spark, how to make an RDD/DataFrame oper

2019-07-14 18:26发布

Assuming that I would like to write a function foo that transforms a DataFrame:

object Foo {
def foo(source: DataFrame): DataFrame = {
...complex iterative algorithm with a stopping condition...
}
}

since the implementation of foo has many "Actions" (collect, reduce etc.), calling foo will immediately triggers the expensive execution.

This is not a big problem, however since foo only converts a DataFrame to another, by convention it should be better to allow lazy execution: the implementation of foo should be executed only if the resulted DataFrame or its derivative(s) are being used on the Driver (through another "Action").

So far, the only way to reliably achieve this is through writing all implementations into a SparkPlan, and superimpose it into the DataFrame's SparkExecution, this is very error-prone and involves lots of boilerplate codes. What is the recommended way to do this?

1条回答
我只想做你的唯一
2楼-- · 2019-07-14 18:57

It is not exactly clear to me what you try to achieve but Scala itself provides at least few tools which you may find useful:

  • lazy vals:

    val rdd = sc.range(0, 10000)
    
    lazy val count = rdd.count  // Nothing is executed here
    // count: Long = <lazy>
    
    count  // count is evaluated only when it is actually used 
    // Long = 10000   
    
  • call-by-name (denoted by => in the function definition):

    def  foo(first: => Long, second: => Long, takeFirst: Boolean): Long =
      if (takeFirst) first else second
    
    val rdd1 = sc.range(0, 10000)
    val rdd2 = sc.range(0, 10000)
    
    foo(
      { println("first"); rdd1.count },
      { println("second"); rdd2.count },
      true  // Only first will be evaluated
    )
    // first
    // Long = 10000
    

    Note: In practice you should create local lazy binding to make sure that arguments are not evaluated on every access.

  • infinite lazy collections like Stream

    import org.apache.spark.mllib.random.RandomRDDs._
    
    val initial = normalRDD(sc, 1000000L, 10)
    
    // Infinite stream of RDDs and actions and nothing blows :)
    val stream: Stream[RDD[Double]] = Stream(initial).append(
      stream.map {
        case rdd if !rdd.isEmpty => 
          val mu = rdd.mean
          rdd.filter(_ > mu)
        case _ => sc.emptyRDD[Double]
      }
    )
    

Some subset of these should be more than enough to implement complex lazy computations.

查看更多
登录 后发表回答