mapreduce count example

2020-08-26 10:46发布

问题:

My question is about mapreduce programming in java.

Suppose I have the WordCount.java example, a standard mapreduce program. I want the map function to collect some information, and return to the reduce function maps formed like: <slaveNode_id,some_info_collected>,

so that I can know what slave node collected what data.. Any idea how??

public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          output.collect(word, one);
        }
      }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
          sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
      }
    }

    public static void main(String[] args) throws Exception {
      JobConf conf = new JobConf(WordCount.class);
      conf.setJobName("wordcount");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(Map.class);
      conf.setCombinerClass(Reduce.class);
      conf.setReducerClass(Reduce.class);

      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
    }
}

Thank you!!

回答1:

What you are asking is to let the application (your map-reduce thingy) know about the infrastructure it ran on.

In general the answer is that your application doesn't need this information. Each call to the Mapper and each call to the Reducer can be executed on a different node or all on the same node. The beauty of MapReduce is that the outcome is the same, so for your application: it doesn't matter.

As a consequence the API don't have features to support this request of yours.

Have fun learning Hadoop :)


P.S. The only way I can think of (which is nasty to say the least) is that you include a system call of some sort in the Mapper and ask the underlying OS about it's name/properties/etc. This kind of construct would make your application very non-portable; i.e. it won't run on Hadoop in Windows or Amazon.



回答2:

Wordcount is the wrong example for you. You want to simply merge all information together. This inverses the things to wordcount.

Basically you're just emitting your slaveNode_id as a IntWritable (if this is possible) and the information as Text.

  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,IntWritable, Text> {
    private Text word = new Text();

  public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
      word.set(tokenizer.nextToken());
      // you have to split your data here: ID and value
      IntWritable id = new IntWritable(YOUR_ID_HERE);

      output.collect(id, word);
    }
  }
}

And the reducer would go the same way:

 public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text,IntWritable, Text> {
  public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable,Text> output, Reporter reporter) throws IOException {

      // now you have all the values for a slaveID as key. Do whatever you like with that...
      for(Text value : values)
         output.collect(key, value)
  }
}