Why does the Spark DataFrame conversion to RDD req

2020-08-17 05:47发布

问题:

From the Spark source code:

/**
   * Represents the content of the Dataset as an `RDD` of `T`.
   *
   * @group basic
   * @since 1.6.0
   */
  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972

The mapPartitions can take as long as the time to compute the RDD in the first place.. So this makes operations such as

df.rdd.getNumPartitions

very expensive. Given that a DataFrame is DataSet[Row] and a DataSet is composed of RDD's why is a re-mapping required? Any insights appreciated.

回答1:

TL;DR That's because the internal RDD is not RDD[Row].

Given that a DataFrame is DataSet[Row] and a DataSet is composed of RDD's

That's a huge oversimplification. First of all DataSet[T] doesn't mean that you interact with container of T. It means that if you use collection-like API (often referred as strongly typed), internal representation will be decoded into T.

The internal representation is a binary format used internally by Tungsten.This representation is internal and subject of changes and far too low level to be used in practice.

An intermediate representation, which exposes this data is the InternalRow - rddQueryExecution.toRDD is in fact RDD[InternalRow]. This representation (there are different implementation) still exposes the internal types, is consider "weakly" private, as all objects in o.a.s.sql.catalyst (the access is not explicitly restricted, but API is not documented), and rather tricky to interact with.

This where decoding comes into play and why you need full "re-mapping" - to convert internal, often unsafe, objects into external types intended for public usage.

Finally, to reiterate my previous statement - the code in question won't be executed when getNumPartitions is called.