I'm running this snippet to sort an RDD of points, ordering the RDD and taking the K-nearest points from a given point:
def getKNN(sparkContext:SparkContext, k:Int, point2:Array[Double], pointsRDD:RDD[Array[Double]]): RDD[Array[Double]] = {
val tuplePointDistanceRDD:RDD[(Double, Array[Double])] = pointsRDD.map(point =>
(DistanceUtils.euclidianDistance(point, point2), point))
sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))
}
Using just one SparkContext in my application and passing it as a parameter to my function, I'm getting a org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
error at the moment I call sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))
to get the KNN points from point2
.
I'm constructing sparkContext
as this snippet bellow:
var sparkContext = new SparkContext("local", "<app_name>")
What would be the possible causes of facing this kind of error?
Basically this is the LOG of my standalone spark environment with the stack trace of this error:
15/12/24 11:55:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:55731]
15/12/24 11:55:29 INFO Utils: Successfully started service 'sparkDriver' on port 55731.
15/12/24 11:55:29 INFO SparkEnv: Registering MapOutputTracker
15/12/24 11:55:29 INFO SparkEnv: Registering BlockManagerMaster
15/12/24 11:55:29 INFO DiskBlockManager: Created local directory at /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/blockmgr-70e73cfe-683b-4297-aa5d-de38f98d02f1
15/12/24 11:55:29 INFO MemoryStore: MemoryStore started with capacity 491.7 MB
15/12/24 11:55:29 INFO HttpFileServer: HTTP File server directory is /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/spark-f7bc8b6f-7d4f-4c55-8dff-0fbc4f6c2532/httpd-fb502369-4c28-4585-a37e-f3645d1d55a3
15/12/24 11:55:29 INFO HttpServer: Starting HTTP Server
15/12/24 11:55:29 INFO Utils: Successfully started service 'HTTP file server' on port 55732.
15/12/24 11:55:29 INFO SparkEnv: Registering OutputCommitCoordinator
15/12/24 11:55:29 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/12/24 11:55:29 INFO SparkUI: Started SparkUI at http://localhost:4040
15/12/24 11:55:29 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/12/24 11:55:29 INFO Executor: Starting executor ID driver on host localhost
15/12/24 11:55:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55733.
15/12/24 11:55:29 INFO NettyBlockTransferService: Server created on 55733
15/12/24 11:55:29 INFO BlockManagerMaster: Trying to register BlockManager
15/12/24 11:55:29 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55733 with 491.7 MB RAM, BlockManagerId(driver, localhost, 55733)
15/12/24 11:55:29 INFO BlockManagerMaster: Registered BlockManager
15/12/24 11:55:30 INFO TorrentBroadcast: Started reading broadcast variable 0
org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:144)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.RDD.sortBy$default$3(RDD.scala:548)
at LOF$.getKNN(LOF.scala:14)
at LOF$.lof(LOF.scala:25)
at BehaviourActivityScoreJudgeTest$$anonfun$1.apply$mcV$sp(BehaviourActivityScoreJudgeTest.scala:14)
at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11)
at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)
at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1683)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683)
at org.scalatest.Suite$class.run(Suite.scala:1424)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1683)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760)
at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760)
at BehaviourActivityScoreJudgeTest.org$scalatest$BeforeAndAfterAll$$super$run(BehaviourActivityScoreJudgeTest.scala:4)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
at BehaviourActivityScoreJudgeTest.run(BehaviourActivityScoreJudgeTest.scala:4)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
at org.scalatest.tools.Runner$.run(Runner.scala:883)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:137)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)
... 94 more
15/12/24 11:55:30 INFO SparkUI: Stopped Spark web UI at http://localhost:4040
15/12/24 11:55:30 INFO DAGScheduler: Stopping DAGScheduler
15/12/24 11:55:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/12/24 11:55:30 INFO MemoryStore: MemoryStore cleared
15/12/24 11:55:30 INFO BlockManager: BlockManager stopped
15/12/24 11:55:30 INFO BlockManagerMaster: BlockManagerMaster stopped
15/12/24 11:55:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/12/24 11:55:30 INFO SparkContext: Successfully stopped SparkContext
15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
For me helped this, because SparkContext was already created
Before i tried with this
But it was broken when i ran
I was also facing the same issue. after a lot of googling I found that I have made a singleton class for SparkContext initialization which is only valid for a single JVM instance, but in case of Spark this singleton class will be invoked from each worker node running on separate JVM instance and hence lead to multiple SparkContext object.
I was getting this error as well. I haven't really seen any concrete coding examples, so I will share my solution. This cleared the error for me, but I have a sense that there may be more than 1 solution to this problem. But this would be worth a go as it keeps everything within the code.
It looks as though the SparkContext was shutting down, thus throwing the error. I think the issue is that the SparkContext is created in a class and then extended to other classes. The extension causes it to shut down, which is a bit annoying. Below is the implementation I used to get this error to clear.
Spark Initialisation Class:
Main Class:
Then just extend your other class with the the Spark Class and it should all work out.
I was getting the error running LogisticRegression Models, so I would assume this should fix it for you as well with other Machine Learning libraries as well.
Just discovered why I was getting this exception: for a reason my
SparkContext
object started/stopped several times betweenScalaTest
methods. So, fixing that behaviour lead me to get spark working in the right way I would expect.Related to the above answers, I encountered this issue when I inadvertently serialized a datastax connector (i.e Cassandra connection driver) query to a spark slave. This then spun off its own SparkContext and within 4 seconds the entire application had crashed