The variables I share via broadcast are null in the cluster.
My application is quite complex, but I have written this small example that works flawlessly when I run it locally, but it fails in the cluster:
package com.gonzalopezzi.bigdata.bicing
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
object PruebaBroadcast2 extends App {
val conf = new SparkConf().setAppName("PruebaBroadcast2")
val sc = new SparkContext(conf)
val arr : Array[Int] = (6 to 9).toArray
val broadcasted = sc.broadcast(arr)
val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines
rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println) // NullPointerException in the flatmap. broadcasted is null
}
I don't know if the problem is a coding error or a configuration issue.
This is the stacktrace I get:
15/07/07 20:55:13 INFO scheduler.DAGScheduler: Job 0 failed: collect at PruebaBroadcast2.scala:24, took 0.992297 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-36-49.ec2.internal): java.lang.NullPointerException
at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Command exiting with ret '1'
Can anyone help me fix this? At least, can you tell me if you see something strange in the code? If you think the code is ok, please tell me, as it would mean that the problem is in the configuration of the cluster.
Thanks in advance.
I had similar issue. The issue was I have a variable, and used it in RDD map function, and I got null value. This is my original code:
And I found it works in any function not just main():
Finally I got it working.
It doesn't work declaring the object like this:
But it works, if you declare an object with a main function:
So, the short example in the question works if I rewrite it this way:
This problem seems related to this bug: https://issues.apache.org/jira/browse/SPARK-4170