How to create RDD from within Task?

2019-06-07 11:42发布

问题:

Normally when creating an RDD from a List you can just use the SparkContext.parallelize method, but you can not use the spark context from within a Task as it's not serializeable. I have a need to create an RDD from a list of Strings from within a task. Is there a way to do this?

I've tried creating a new SparkContext in the task, but it gives me an error about not supporting multiple spark contexts in the same JVM and that I need to set spark.driver.allowMultipleContexts = true. According to the Apache User Group, that setting however does not yet seem to be supported

回答1:

As far as I am concerned it is not possible and it is hardly a matter of serialization or a support for multiple Spark contexts. A fundamental limitation is a core Spark architecture. Since Spark context is maintained by a driver and tasks are executed on the workers creating a RDD from inside a task would require pushing changes from workers to a driver. I am not saying it is technically impossible but a whole ideas seems to be rather cumbersome.

Creating Spark context from inside tasks looks even worse. First of all it would mean that context is created on the workers, which for all practical purposes don't communicate with each other. Each worker would get its own context which could operate only on a data that is accessible on given worker. Finally preserving worker state is definitely not a part of the contract so any context create inside a task should be simply garbage collected after the task is finished.

If handling the problem using multiple jobs is not an option you can try to use mapPartitions like this:

val rdd = sc.parallelize(1 to 100)

val tmp = rdd.mapPartitions(iter => {

  val results = Map(
    "odd" -> scala.collection.mutable.ArrayBuffer.empty[Int],
    "even" -> scala.collection.mutable.ArrayBuffer.empty[Int]
  )

  for(i <- iter) {
    if (i % 2 != 0) results("odd") += i
    else results("even") += i
  }

  Iterator(results)
})

val odd = tmp.flatMap(_("odd"))
val even = tmp.flatMap(_("even"))