Nesting parallelizations in Spark? What's the

2020-04-17 02:23发布

问题:

NESTED PARALLELIZATIONS?

Let's say I am trying to do the equivalent of "nested for loops" in Spark. Something like in a regular language, let's say I have a routine in the inside loop that estimates Pi the way the Pi Average Spark example does (see Estimating Pi)

i = 1000; j = 10^6; counter = 0.0;

for ( int i =0; i < iLimit; i++)
    for ( int j=0; j < jLimit ; j++)
        counter += PiEstimator();

estimateOfAllAverages = counter / i;

Can I nest parallelize calls in Spark? I am trying and have not worked out the kinks yet. Would be happy to post errors and code but I think I am asking a more conceptual question about whether this is the right approach in Spark.

I can already parallelize a single Spark Example / Pi Estimate, now I want to do that 1000 times to see if it converges on Pi. (This relates to a larger problem we are trying to solve, if something closer to MVCE is needed I'd be happy to add )

BOTTOM LINE QUESTION I just need someone to answer directly: Is this the right approach, to use nested parallelize calls? If not please advise something specific, thanks! Here's a pseudo-code approach of what I think will be the right approach:

// use accumulator to keep track of each Pi Estimate result

sparkContext.parallelize(arrayOf1000, slices).map{ Function call

     sparkContext.parallelize(arrayOf10^6, slices).map{
            // do the 10^6 thing here and update accumulator with each result
    }
}

// take average of accumulator to see if all 1000 Pi estimates converge on Pi

BACKGROUND: I had asked this question and got a general answer but it did not lead to a solution, after some waffling I decided to post a new question with a different characterization. I also tried to ask this on the Spark User maillist but no dice there either. Thanks in advance for any help.

回答1:

This is not even possible as SparkContext is not serializable. If you want a nested for loop, then your best option is to use cartesian

val nestedForRDD = rdd1.cartesian(rdd2)
nestedForRDD.map((rdd1TypeVal, rdd2TypeVal) => {
  //Do your inner-nested evaluation code here
})

Keep in mind, just as a double for loop, this comes at a size cost.



回答2:

In the Pi example, in the nested for loop you can get the same answer by doing a single loop through the process i * j times and summing over all of them and then dividing by j at the end. If you have steps that you want to apply in the outer loop, do them within the loop, but create different groups by assigning specific keys to each inner-loop group. Without knowing what kinds of things you want to do in the outer loop its hard to give an example here.

For the simple case of just averaging to improve convergence, its relatively easy. Instead of doing the nested loop, just make an rdd with i * j elements and then apply the function to each element.

this might look like (with pySpark ): ( f is whatever function you want to apply, remember that it will pass each element in the RDD so define your f with an input even if you don't use it in your function)

x = RandomRDDs.uniformRDD(sc, i*j)
function_values = x.map(f)

from operator import add   
sum_of_values = function_values.reduce(add)
averaged_value = sum_of_values/j (if you are only averaging over the outer loop)

If you want perform actions in the outer loop, I'd assign an index (zipWIthIndex) then create a key using the index modulo j. Then each different key would be a single virtual inner loop cycle and you can use operators like aggregateByKey, foldByKey, or reduceByKey to perform actions only on those records. This will probably take a bit of a performance hit if the different keys are distributed to different partitions.

An alternative would be to repartition the rdd onto j partitions and then use a foreachPartition function to apply a function to each partition.

A third option would be to run the inner loop j times in parallel, concatenate the results into one distributed file, and then do the outer loop operations after reading this into Spark.



回答3:

No. You can't.

SparkContext is only accessible from the spark Driver node. The inner parallelization() calls would try to execute SparkContext from the worker nodes, which do not have access to SparkContext.