I'm applying a function, lets say f(), via the map method to rows of a DataFrame (call it df
) but I see a NullPointerException when calling collect on resulting RDD if df.columns
is passed as an argument to f().
The following Scala code, which can be pasted inside a spark-shell, shows a minimal example of the issue (see function prepRDD_buggy()
). I've also posted my current workaround for this issue in the function prepRDD()
where the only difference that column names are passed as a val
instead of as df.columns
.
Can some Spark expert please point out the precise reason why this is happening or confirm our hypothesis that slave nodes do not get DataFrame column names?
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
// A Simple DataFrame
val dataRDD: RDD[Row] = sc.parallelize(Array(
Row(1.0,2.1,3.3),
Row(3.4,5.9,8.9),
Row(3.1,2.3,4.1)))
val struct: StructType = StructType(
StructField("y", DoubleType, false) ::
StructField("x1", DoubleType, false) ::
StructField("x2", DoubleType, false) :: Nil)
val df: DataFrame = sqlContext.createDataFrame(dataRDD, struct)
// Make LabeledPoint object from Row objects
def makeLP(row: Row, colnames: Array[String]) =
LabeledPoint(row.getDouble(0),
Vectors.dense((1 until row.length).toArray map (i => row.getDouble(i))))
// Make RDD[LabeledPoint] from DataFrame
def prepRDD_buggy(df: DataFrame): RDD[LabeledPoint] = {
df map (row => makeLP(row, df.columns))
}
val mat_buggy = prepRDD_buggy(df)
mat_buggy.collect // throws NullPointerException !
// Make RDD[LabeledPoint] from DataFrame
def prepRDD(df: DataFrame): RDD[LabeledPoint] = {
val cnames = df.columns
df map (row => makeLP(row, cnames))
}
val mat = prepRDD(df)
mat.collect // Works fine
Here is the first few lines of the (very verbose) error message that I see on running mat_buggy.collect
inside my spark-shell.
15/12/24 18:09:28 INFO SparkContext: Starting job: collect at <console>:42
15/12/24 18:09:28 INFO DAGScheduler: Got job 0 (collect at <console>:42) with 2 output partitions
15/12/24 18:09:28 INFO DAGScheduler: Final stage: ResultStage 0(collect at <console>:42)
15/12/24 18:09:28 INFO DAGScheduler: Parents of final stage: List()
15/12/24 18:09:28 INFO DAGScheduler: Missing parents: List()
15/12/24 18:09:28 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38), which has no missing parents
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(11600) called with curMem=0, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 11.3 KB, free 535.0 MB)
15/12/24 18:09:28 INFO MemoryStore: ensureFreeSpace(4540) called with curMem=11600, maxMem=560993402
15/12/24 18:09:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.4 KB, free 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.98:53386 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
15/12/24 18:09:28 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at map at <console>:38)
15/12/24 18:09:28 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
15/12/24 18:09:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, ip-10-10-10-213.ec2.internal, PROCESS_LOCAL, 2385 bytes)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-213.ec2.internal:56642 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-10-10-217.ec2.internal:56396 (size: 4.4 KB, free: 535.0 MB)
15/12/24 18:09:29 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-10-10-217.ec2.internal): java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:290)
at org.apache.spark.sql.DataFrame.columns(DataFrame.scala:306)
at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy$1.apply(<console>:38)
at $line34.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$prepRDD_buggy$1.apply(<console>:38)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
Your hypothesis is correct.
columns
requires an access toschema
and schema depends onqueryExecution
which is transient hence won't be shipped to the workers. So what you're doing inprepRDD
is more or less correct although the same information can be extracted directly from rows:On a side note
VectorAssembler
plus simplemap
would be a better choice here.