I need to perform aggregation using the results form all the reduce tasks. Basically the reduce task finds the sum and count and a value. I need to add all the sums and counts and find the final average.
I tried using conf.setInt
in reduce. But when I try to access it from the main function it fails
class Main {
public static class MyReducer
extends Reducer<Text, Text,Text,IntWritable> {
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
int i = 0;
int fd = 0, fc = 0;
fd = context.getConfiguration().getInt("fd", -1);
fc = context.getConfiguration().getInt("fc", -1);
//when I check the value of fd, fc here they are fine. fc fd is shared across all reduce tasks and the updated value is seen by all reduce task. Only main function doesnt seem to have access to it.
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.setInt("fc", 5);
Job job = new Job(conf, "Flight Data");
job.setJarByClass(FlightData.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(MyReducer.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(GroupComparator.class);
job.setSortComparatorClass(KeyComparator.class);
job.setNumReduceTasks(10);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
flightCount = job.getConfiguration().getInt("fc", -1);
flightDelay = job.getConfiguration().getInt("fd", -1);
//here when I access fc, fd, I get back 5 & 5
System.out.println("Final " + flightCount +" " + flightDelay+ " " + flightDelay/flightCount);
}
Override the
run()
of the mapper and reducer using the neworg.apache.hadoop.mapreduce
API. In these methods you can emit the accumulated sum/count from each mapper or reducer.Also you would need to limit the reducer count by 1 so as to get a global sum of all the sums generated by multiple mappers.
See the below code for more clarity:
You may very well map this approach to your problem.
Found the solution. I used counters
http://diveintodata.org/2011/03/15/an-example-of-hadoop-mapreduce-counter/
public class FlightData {
}