How to checkpoint DataFrames?

2019-03-12 01:50发布

问题:

I'm looking for a way to checkpoint DataFrames. Checkpoint is currently an operation on RDD but I can't find how to do it with DataFrames. persist and cache (which are synonyms for each other) are available for DataFrame but they do not "break the lineage" and are thus unsuitable for methods that could loop for hundreds (or thousands) of iterations.

As an example, suppose that I have a list of functions whose signature is DataFrame => DataFrame. I want to have a way to compute the following even when myfunctions has hundreds or thousands of entries:

def foo(dataset: DataFrame, g: DataFrame => Unit) =
    myfunctions.foldLeft(dataset) {
        case (df, f) =>
            val nextDF = f(df)
            g(nextDF)
            nextDF
   }

回答1:

TL;DR: For Spark versions up to 1.6, to actually get a "checkpointed DF", my suggested solution is based on another answer, but with one extra line:

df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd, df.schema)
// df2 is checkpointed

Explanation

Updated after further research.

As pointed out, checkpointing a DataFrame directly is not currently (Spark 1.6.1) possible, though there is an issue for it on Spark's Jira.

So, a possible workaround is the one suggested on another answer:

df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint

However, with this approach, only the df.rdd object will be checkpointed. This can be verified by calling toDebugString to df.rdd:

 scala> df.rdd.toDebugString
 (32) MapPartitionsRDD[1] at rdd at <console>:38 []
  |   ReliableCheckpointRDD[2] at count at <console>:38 []

and then calling toDebugString after a quick transformation to df (please note that I created my DataFrame from a JDBC source), returns the following:

scala> df.withColumn("new_column", lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
 |   MapPartitionsRDD[4] at rdd at <console>:38 []
 |   JDBCRDD[3] at rdd at <console>:38 []

df.explain also shows a hint:

scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)

So, to actually achieve a "checkpointed" DataFrame, I can only think of creating a new one from the checkpointed RDD:

val newDF = sqlContext.createDataFrame(df.rdd, df.schema)
// or
val newDF = df.rdd.map { 
  case Row(val1: Int, ..., valN: Int) => (val1, ..., valN)
}.toDF("col1", ..., "colN")

Then we can verify that the new DataFrame is "checkpointed":

1) newDF.explain:

scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5, col2#6, col3#7]

2) newDF.rdd.toDebugString:

scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

3) With transformation:

scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
 |   MapPartitionsRDD[11] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

Also, I tried some more complex transformations and I was able to check, in practice, that the newDF object was checkpointed.

Therefore, the only way I found to reliably checkpoint a DataFrame was by checkpointing its associated RDD and creating a new DataFrame object from it.

I hope it helps. Cheers.



回答2:

As of spark 2.1, dataframe has a checkpoint method (see http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset) you can use directly, no need to go through RDD.



回答3:

I think right now you'll have to do

sc.setCheckpointDir("/DIR")
df.rdd.checkpoint

And then you will have to perform your action on the underlying df.rdd. Calling df.ACTION will not work currently, only df.rdd.ACTION



回答4:

Extending to Assaf Mendelson answer,

As of today Spark version 2.2, DataSet#checkpoint() API is Evolving and Experimental

Usage:

Before checkpoint CheckpointDir has to be mentioned using SparkContext

spark.sparkContext.setCheckpointDir("checkpoint/dir/location")

val ds: Dataset[Long] = spark.range(10).repartition('id % 2)

// do checkpoint now, it will preserve partition also
val cp: Dataset[Long] = ds.checkpoint()

How is works internally?

So far the implementation for DataSet checkpoint is to convert the DataSet to RDD then checkpoint it.

// In DataSet.scala 

//API we used in example  
def checkpoint(): Dataset[T] = checkpoint(eager = true)

//Base implementation
def checkpoint(eager: Boolean): Dataset[T] = {
    val internalRdd = queryExecution.toRdd.map(_.copy())
    internalRdd.checkpoint()

    if (eager) {
      internalRdd.count() //To materialize DataSet immediately on checkpoint() call
    }

  ...
}