Spark broadcasted variable returns NullPointerExce

2019-06-19 22:50发布

The variables I share via broadcast are null in the cluster.

My application is quite complex, but I have written this small example that works flawlessly when I run it locally, but it fails in the cluster:

package com.gonzalopezzi.bigdata.bicing

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

object PruebaBroadcast2 extends App {
  val conf = new SparkConf().setAppName("PruebaBroadcast2")
  val sc = new SparkContext(conf)

  val arr : Array[Int] = (6 to 9).toArray
  val broadcasted = sc.broadcast(arr)

  val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines
  rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println)  // NullPointerException in the flatmap. broadcasted is null

}

I don't know if the problem is a coding error or a configuration issue.

This is the stacktrace I get:

15/07/07 20:55:13 INFO scheduler.DAGScheduler: Job 0 failed: collect at PruebaBroadcast2.scala:24, took 0.992297 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-36-49.ec2.internal): java.lang.NullPointerException
    at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
    at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Command exiting with ret '1'

Can anyone help me fix this? At least, can you tell me if you see something strange in the code? If you think the code is ok, please tell me, as it would mean that the problem is in the configuration of the cluster.

Thanks in advance.

2条回答
该账号已被封号
2楼-- · 2019-06-19 23:22

I had similar issue. The issue was I have a variable, and used it in RDD map function, and I got null value. This is my original code:

object MyClass extends App {
    ...
    val prefix = "prefix" 
    val newRDD = inputRDD.map(s => prefix + s) // got null for prefix
    ...
}

And I found it works in any function not just main():

object MyClass extends App {
    ...
    val prefix = "prefix" 
    val newRDD = addPrefix(input, prefix)
    def addPrefix(input: RDD[String], prefix: String): RDD[String] = {
        inputRDD.map(s => prefix + s)
    }
}
查看更多
我欲成王,谁敢阻挡
3楼-- · 2019-06-19 23:30

Finally I got it working.

It doesn't work declaring the object like this:

object MyObject extends App {

But it works, if you declare an object with a main function:

object MyObject {
    def main (args : Array[String]) {
    /* ... */
    }
}

So, the short example in the question works if I rewrite it this way:

object PruebaBroadcast2 {

  def main (args: Array[String]) {
    val conf = new SparkConf().setAppName("PruebaBroadcast2")
    val sc = new SparkContext(conf)

    val arr : Array[Int] = (6 to 9).toArray
    val broadcasted = sc.broadcast(arr)

    val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2)

    rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println)
  }
}

This problem seems related to this bug: https://issues.apache.org/jira/browse/SPARK-4170

查看更多
登录 后发表回答