Spark: Cannot add RDD elements into a mutable Hash

2019-07-23 00:09发布

问题:

I have the following code where rddMap is of org.apache.spark.rdd.RDD[(String, (String, String))], and myHashMap is scala.collection.mutable.HashMap.

I did .saveAsTextFile("temp_out") to force the evaluation of rddMap.map.

However, even println(" t " + t) is printing things, later myHashMap still has only one element I manually put in the beginning ("test1", ("10", "20")). Everything in the rddMap is not put into myHashMap.

Snippet code:

val myHashMap = new HashMap[String, (String, String)]
myHashMap.put("test1", ("10", "20"))
rddMap.map { t =>
  println(" t " + t)
  myHashMap.put(t._1, t._2)
}.saveAsTextFile("temp_out")

println(rddMap.count)
println(myHashMap.toString)

Why I cannot put the elements from rddMap to my myHashMap?

回答1:

Here is a working example of what you want to accomplish.

val rddMap = sc.parallelize(Map("A" -> ("v", "v"), "B" -> ("d","d")).toSeq)
// Collects all the data in the RDD and converts the data to a Map
val myMap = rddMap.collect().toMap
myMap.foreach(println)

Output:

(A,(v,v))  
(B,(d,d))

Here is similar code to what you've posted

rddMap.map { t=> 
  println("t" + t)
  newHashMap.put(t._1, t._2)
  println(newHashMap.toString) 
}.collect

Here is the output to the above code from the Spark shell

t(A,(v,v))  
Map(A -> (v,v), test1 -> (10,20))  
t(B,(d,d))  
Map(test1 -> (10,20), B -> (d,d))

To me it looks like Spark copies your HashMap and does add the element to the copied map.



回答2:

What you are trying to do is not really supported in Spark today.

Note that every user defined function (e.g., what you add inside a map()) is a closure that gets serialized and pushed to each executioner.

Therefore everything you have inside this map() gets serialized and gets transferred around:

.map{ t =>
  println(" t " + t)
  myHashMap.put(t._1, t._2)
}

Essentially, your myHashMap will be copied to each executioner and each executioner will be updating it's own version of that HashMap. This is why at the end of the execution the myHashMap you have in your driver will never get changed. (Driver is the JVM that manages/orchestrates your Spark jobs. It's the place where you define your SparkContext.)

In order to push structures defined in the driver to all executioners you need to broadcast them (see link here). Note that broadcasted variables are read-only, so again, using broadcasts will not help you here.

Another way is to use Accumulators but I feel that these are more tune towards summarizing numeric values, like doing sum, max, min, etc. Maybe you can take a look at creating a custom accumulator that extends AccumulatorParam. See link here.

Coming back to the original question, if you want to collect values to your driver, currently the best way to do this is to transform your RDDs until they become a small and manageable collection of elements and then you collect() this final/small RDD.