In the spark - java program I need to read a config file and populate a HashMap , which I need to publish as broadcast variable so that it will be available across all the datanodes .
I need to get the value of this broadcast variable in the CustomInputFormat class which is going to run in the datanodes . How can i specify in my CustomInputFormat class to get value from the specific broadcast variable since the broadcast variable is declared in my driver program ?
I am adding some code to explain it in more :
In this scenario1 I am using it in Driver Program itself ie the variable is used in the same class : Here I can use Broadcat.value() method
> final Broadcast<String[]> signPrefixes =
> sc.broadcast(loadCallSignTable());
> JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
> new PairFunction<Tuple2<String, Integer>, String, Integer> (){
> public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
> String sign = callSignCount._1();
> String country = lookupCountry(sign, signPrefixes.value());
> return new Tuple2(country, callSignCount._2());
> }}).reduceByKey(new SumInts());
In the scenario 2 I am going to use the Broadcast Variable inside my Custom Input Format class :
Driver Program :
> final JavaSparkContext sc= new
> JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster"));
> Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
>
> JavaPairRDD<NullWritable, ArrayList<Record>> baseRDD =
> sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class,
> ArrayList.class, conf);
InputFormat.class
> public class InputFormat extends FileInputFormat {
>
> @Override public RecordReader<NullWritable, ArrayList<Record>>
> createRecordReader(InputSplit split, TaskAttemptContext context)
> throws IOException, InterruptedException{
> //I want to get the Broadcast Variable Here -- How will I do it
>
> RecordReader reader = new RecordReader(); reader.initialize(split, context); return reader; } @Override
> protected boolean isSplitable(JobContext context, Path file) {
> return false; } }
I ran into this myself recently. Ended being rather simple actually (after a few hours and then a... a Ha!)
Create a new Configuration, set your vars, and pass it to a slightly different implementation of the newAPIHadoopFile function.
From the driver program (using Scala here):
From your InputFormat or InputReader..or wherever you have a context (Java this time)
or maybe
You would create the broadcast var on the driver w/
val bcVariable = sc.broadcast(myVariableToBroadcast)
and access it later w/bcVariable.value