I have the following code where rddMap
is of org.apache.spark.rdd.RDD[(String, (String, String))]
, and myHashMap
is scala.collection.mutable.HashMap
.
I did .saveAsTextFile("temp_out")
to force the evaluation of rddMap.map
.
However, even println(" t " + t)
is printing things, later myHashMap
still has only one element I manually put in the beginning ("test1", ("10", "20"))
.
Everything in the rddMap
is not put into myHashMap
.
Snippet code:
val myHashMap = new HashMap[String, (String, String)]
myHashMap.put("test1", ("10", "20"))
rddMap.map { t =>
println(" t " + t)
myHashMap.put(t._1, t._2)
}.saveAsTextFile("temp_out")
println(rddMap.count)
println(myHashMap.toString)
Why I cannot put the elements from rddMap to my myHashMap
?
What you are trying to do is not really supported in Spark today.
Note that every user defined function (e.g., what you add inside a
map()
) is a closure that gets serialized and pushed to each executioner.Therefore everything you have inside this
map()
gets serialized and gets transferred around:Essentially, your
myHashMap
will be copied to each executioner and each executioner will be updating it's own version of thatHashMap
. This is why at the end of the execution themyHashMap
you have in your driver will never get changed. (Driver is the JVM that manages/orchestrates your Spark jobs. It's the place where you define your SparkContext.)In order to push structures defined in the driver to all executioners you need to
broadcast
them (see link here). Note that broadcasted variables are read-only, so again, using broadcasts will not help you here.Another way is to use
Accumulators
but I feel that these are more tune towards summarizing numeric values, like doing sum, max, min, etc. Maybe you can take a look at creating a custom accumulator that extendsAccumulatorParam
. See link here.Coming back to the original question, if you want to collect values to your driver, currently the best way to do this is to transform your RDDs until they become a small and manageable collection of elements and then you
collect()
this final/small RDD.Here is a working example of what you want to accomplish.
Output:
Here is similar code to what you've posted
Here is the output to the above code from the Spark shell
To me it looks like Spark copies your HashMap and does add the element to the copied map.