In Apache Spark, can I easily repeat/nest a SparkC

2019-03-05 23:11发布

问题:

I am trying to model a genetics problem we are trying to solve, building up to it in steps. I can successfully run the PiAverage examples from Spark Examples. That example "throws darts" at a circle (10^6 in our case) and counts the number that "land in the circle" to estimate PI

Let's say I want to repeat that process 1000 times (in parallel) and average all those estimates. I am trying to see the best approach, seems like there's going to be two calls to parallelize? Nested calls? Is there not a way to chain map or reduce calls together? I can't see it.

I want to know the wisdom of something like the idea below. I thought of tracking the resulting estimates using an accumulator. jsc is my SparkContext, full code of single run is at end of question, thanks for any input!

Accumulator<Double> accum = jsc.accumulator(0.0);

// make a list 1000 long to pass to parallelize (no for loops in Spark, right?)
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);

// pass this "dummy list" to parallelize, which then 
// calls a pieceOfPI method to produce each individual estimate  
// accumulating the estimates. PieceOfPI would contain a 
// parallelize call too with the individual test in the code at the end
jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(jsc, numList, slices, HOW_MANY_ESTIMATES)));

// get the value of the total of PI estimates and print their average
double totalPi = accum.value();

// output the average of averages
System.out.println("The average of " + HOW_MANY_ESTIMATES + " estimates of Pi is " + totalPi / HOW_MANY_ESTIMATES);

It doesn't seem like a matrix or other answers I see on SO give the answer to this specific question, I have done several searches but I am not seeing how to do this without "parallelizing the parallelization." Is that a bad idea?

(and yes I realize mathematically I could just do more estimates and effectively get the same results :) Trying to build a structure my boss wants, thanks again!

I have put my entire single-test program here if that helps, sans an accumulator I was testing out. The core of this would become PieceOfPI():

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.Accumulable;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;

public class PiAverage implements Serializable {

public static void main(String[] args) {

    PiAverage pa = new PiAverage();
    pa.go();

}

public void go() {

    // should make a parameter like all these finals should be
    // int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
    final int SLICES = 16;

    // how many "darts" are thrown at the circle to get one single Pi estimate
    final int HOW_MANY_DARTS = 1000000;

    // how many "dartboards" to collect to average the Pi estimate, which we hope converges on the real Pi
    final int HOW_MANY_ESTIMATES = 1000;

    SparkConf sparkConf = new SparkConf().setAppName("PiAverage")
        .setMaster("local[4]");

    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    // setup "dummy" ArrayList of size HOW_MANY_DARTS -- how many darts to throw
    List<Integer> throwsList = new ArrayList<Integer>(HOW_MANY_DARTS);
    for (int i = 0; i < HOW_MANY_DARTS; i++) {
        throwsList.add(i);
    }

    // setup "dummy" ArrayList of size HOW_MANY_ESTIMATES
    List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);
    for (int i = 0; i < HOW_MANY_ESTIMATES; i++) {
        numberOfEstimates.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(throwsList, SLICES);

    long totalPi = dataSet.filter(new Function<Integer, Boolean>() {
        public Boolean call(Integer i) {
            double x = Math.random();
            double y = Math.random();
            if (x * x + y * y < 1) {
                return true;
            } else
                return false;
        }
    }).count();

    System.out.println(
            "The average of " + HOW_MANY_DARTS + " estimates of Pi is " + 4 * totalPi / (double)HOW_MANY_DARTS);

    jsc.stop();
    jsc.close();
}
}

回答1:

Let me start with your "background question". Transformation operations like map, join, groupBy, etc. fall into two categories; those that require a shuffle of data as input from all the partitions, and those that don't. Operations like groupBy and join require a shuffle, because you need to bring together all records from all the RDD's partitions with the same keys (think of how SQL JOIN and GROUP BY ops work). On the other hand, map, flatMap, filter, etc. don't require shuffling, because the operation works fine on the input of the previous step's partition. They work on single records at a time, not groups of them with matching keys. Hence, no shuffling is necessary.

This background is necessary to understand that an "extra map" does not have a significant overhead. A sequent of operations like map, flatMap, etc. are "squashed" together into a "stage" (which is shown when you look at details for a job in the Spark Web console) so that only one RDD is materialized, the one at the end of the stage.

On to your first question. I wouldn't use an accumulator for this. They are intended for "side-band" data, like counting how many bad lines you parsed. In this example, you might use accumulators to count how many (x,y) pairs were inside the radius of 1 vs. outside, as an example.

The JavaPiSpark example in the Spark distribution is about as good as it gets. You should study why it works. It's the right dataflow model for Big Data systems. You could use "aggregators". In the Javadocs, click the "index" and look at the agg, aggregate, and aggregateByKey functions. However, they are no more understandable and not necessary here. They provide greater flexibility than map then reduce, so they are worth knowing

The problem with your code is that you are effectively trying to tell Spark what to do, rather than expressing your intent and letting Spark optimize how it does it for you.

Finally, I suggest you buy and study O'Reilly's "Learning Spark". It does a good job explaining the internal details, like staging, and it shows lots of example code you can use, too.