Spark multiple contexts

2020-02-05 07:58发布

问题:

In short :

EC2 cluster: 1 master 3 slaves

Spark version : 1.3.1

I wish to use the option spark.driver.allowMultipleContexts, one context local (master only) and one cluster (master and slaves).

I get this stacktrace error (line 29 is where I call the object that initialize the second sparkcontext) :

fr.entry.Main.main(Main.scala)
   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812)
   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808)
   at scala.Option.foreach(Option.scala:236)
   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808)
   at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795)
   at scala.Option.foreach(Option.scala:236)
   at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795)
   at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847)
   at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754)
   at fr.entry.cluster$.<init>(Main.scala:79)
   at fr.entry.cluster$.<clinit>(Main.scala)
   at fr.entry.Main$delayedInit$body.apply(Main.scala:29)
   at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
   at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
   at scala.App$$anonfun$main$1.apply(App.scala:71)
   at scala.App$$anonfun$main$1.apply(App.scala:71)
   at scala.collection.immutable.List.foreach(List.scala:318)
   at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
   at scala.App$class.main(App.scala:71)
   at fr.entry.Main$.main(Main.scala:14)
   at fr.entry.Main.main(Main.scala)
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-  20150928153330-0036/2 is now LOADING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-    20150928153330-0036/0 is now RUNNING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING
15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29
15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false)
15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29)
15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List()
15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List()
15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB)
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING
15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29)
15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

More details :

I would like to run one program which does two things. Firstly I have a sparkContext local (on the master only), I make a RDD and do some operations. Secondly I have a second sparkContext initialize with a master and 3 slaves which also make a RDD and does some operations. So in the first case I want to use the 16 cores of the master and the second case I want to use the 8cores x 3 of the slaves.

Simple Example :

val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8))
println(local.sparkContext.makeRDD(arr).count()) 
println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum)

My two SparkContexts :

object local {

  val project = "test"
  val version = "1.0"

  val sc = new SparkConf()
    .setMaster("local[16]")
    .setAppName("Local")
    .set("spark.local.dir", "/mnt")
    .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar"))
    .setSparkHome("/root/spark")
    .set("spark.driver.allowMultipleContexts", "true")
    .set("spark.executor.memory", "45g")

  val sparkContext = new SparkContext(sc)
}

object cluster {

  val project = "test"
  val version = "1.0"

  val sc = new SparkConf()
   .setMaster(masterURL)  // ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com
   .setAppName("Cluster")
   .set("spark.local.dir", "/mnt")
   .setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars)
   .setSparkHome("/root/spark")
   .set("spark.driver.allowMultipleContexts", "true")
   .set("spark.executor.memory", "35g")

  val sparkContext = new SparkContext(sc)
}

How can I fix this?

回答1:

Although configuration option spark.driver.allowMultipleContexts exists, it is misleading because usage of multiple Spark contexts is discouraged. This option is used only for Spark internal tests and is not supposed to be used in user programs. You can get unexpected results while running more than one Spark context in a single JVM.



回答2:

If coordination is required between 2 programs, then it would be better to make it part of a single Spark application to take advantage of Sparks internal optimizations and to avoid unnecessary i/o.

Secondly, if 2 applications do not need to coordinate in any way, you can launch 2 separate applications. Since you are using Amazon EC2/EMR, you can use YARN as your resource manager without significant time investment as described here.



回答3:

If you have a necessity to work with lots of Spark contexts, you can turn on special option [MultipleContexts] (1) , but it is used only for Spark internal tests and is not supposed to be used in user programs. You will get unexpected behavior while running more than one Spark context in a single JVM [SPARK-2243] (2). However, it is possible to create different contexts in separate JVMs, and manage contexts at SparkConf level, which will optimally fit the executable Jobs.

It looks like this: Mist creates every new Sparkcontext in its own JVM.

There is a middleware on top of Spark - [Mist]. It manages Spark contexts and multiple JVMs, so you could have different jobs like ETL pipeline, a fast forecast job, an ad-hoc Hive query and a Spark streaming application running in parallel on the same cluster.

1> github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/SparkContextSuite.scala#L67

2> issues.apache.org/jira/browse/SPARK-2243



回答4:

Java:

 .set("spark.driver.allowMultipleContexts", "true")

+

sparkContext.cancelAllJobs();
sparkContext.stop();

It works for me.