Spark DataFrame column names not passed to slave n

2019-05-03 23:34发布

I'm applying a function, lets say f(), via the map method to rows of a DataFrame (call it df) but I see a NullPointerException when calling collect on resulting RDD if df.columns is passed as an argument to f().

The following Scala code, which can be pasted inside a spark-shell, shows a minimal example of the issue (see function prepRDD_buggy()). I've also posted my current workaround for this issue in the function prepRDD() where the only difference that column names are passed as a val instead of as df.columns.

Can some Spark expert please point out the precise reason why this is happening or confirm our hypothesis that slave nodes do not get DataFrame column names?

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

// A Simple DataFrame
val dataRDD: RDD[Row] = sc.parallelize(Array(
  Row(1.0,2.1,3.3),
  Row(3.4,5.9,8.9),
  Row(3.1,2.3,4.1)))
val struct: StructType = StructType(
  StructField("y", DoubleType, false) ::
  StructField("x1", DoubleType, false) ::
  StructField("x2", DoubleType, false) :: Nil)
val df: DataFrame = sqlContext.createDataFrame(dataRDD, struct)

// Make LabeledPoint object from Row objects
def makeLP(row: Row, colnames: Array[String]) =
  LabeledPoint(row.getDouble(0), 
    Vectors.dense((1 until row.length).toArray map (i => row.getDouble(i))))

// Make RDD[LabeledPoint] from DataFrame
def prepRDD_buggy(df: DataFrame): RDD[LabeledPoint] = {
  df map (row => makeLP(row, df.columns))
}
val mat_buggy = prepRDD_buggy(df) 
mat_buggy.collect // throws NullPointerException !

// Make RDD[LabeledPoint] from DataFrame
def prepRDD(df: DataFrame): RDD[LabeledPoint] = {
  val cnames = df.columns
  df map (row => makeLP(row, cnames))
}
val mat = prepRDD(df) 
mat.collect // Works fine

Here is the first few lines of the (very verbose) error message that I see on running mat_buggy.collect inside my spark-shell.

15/12/24 18:09:28 INFO SparkContext: Starting job: collect at <console>:42
15/12/24 18:09:28 INFO DAGScheduler: Got job 0 (collect at <console>:42) with 2 output partitions
15/12/24 18:09:28 INFO DAGScheduler: Final stage: ResultStage 0(collect at <console>:42)
15/12/24 18:09:28 INFO DAGScheduler: Parents of final stage: List()
15/12/24 18:09:28 INFO DAGScheduler: Missing parents: List()
15/12/24 18:09:28 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38), which has no missing parents
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(11600) called with curMem=0, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 11.3 KB, free 535.0 MB)
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(4540) called with curMem=11600, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.4 KB, free 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.98:53386 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
15/12/24 18:09:28 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38)
15/12/24 18:09:28 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
15/12/24 18:09:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, ip-10-10-10-213.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-213.ec2.internal:56642 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-217.ec2.internal:56396 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:29 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal): java.lang.NullPointerException
    at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:290)
    at org.apache.spark.sql.DataFrame.columns(DataFrame.scala:306)
    at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy$1.apply(<console>:38)
    at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy$1.apply(<console>:38)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)

1条回答
该账号已被封号
2楼-- · 2019-05-04 00:11

Your hypothesis is correct. columns requires an access to schema and schema depends on queryExecution which is transient hence won't be shipped to the workers. So what you're doing in prepRDD is more or less correct although the same information can be extracted directly from rows:

scala> df.rdd.map(_.schema.fieldNames).first
res14: Array[String] = Array(y, x1, x2, x3)

On a side note VectorAssembler plus simple map would be a better choice here.

查看更多
登录 后发表回答