I'm getting a NPE when trying to coalesce and save out an RDD.
Code works locally, and works on the cluster in the scala shell, but throws the error when submitting it as a job to the cluster.
I've tried printing out using a take() to see if the rdd contains some null data, but this throws the same error - pain because it works ok in the shell.
I'm saving out to HDFS and have the full url path in the variable - model saves with this method fine during MLLib training phase.
Any ideas much appreciated!
Scala Code (Whole Prediction Func):
//Load the Random Forest
val rfModel = RandomForestModel.load(sc, modelPath)
//Make the predictions - Here the label is the unique ID of the point
val rfPreds = labDistVect.map(p => (p.label, rfModel.predict(p.features)))
//Collect and save
println("Done Modelling, now saving preds")
val outP = rfPreds.coalesce(1,true).saveAsTextFile(outPreds)
println("Done Modelling, now saving coords")
val outC = coords.coalesce(1,true).saveAsTextFile(outCoords)
Stack Trace:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 40, XX.XX.XX.XX): java.lang.NullPointerException
at GeoDistPredict1$$anonfun$38.apply(GeoDist1.scala:340)
at GeoDistPredict1$$anonfun$38.apply(GeoDist1.scala:340)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)