MapReduce WordCount Program - output is same as th

2020-07-30 04:28发布

问题:

The output I am expecting is the count of every word in the input file. But my output is the whole input file, as it is. I am using extends Mapper<LongWritable, Text, Text, IntWritable> for mapper class and Reducer<Text, IntWritable, Text, IntWritable> for reducer class. Here is my code

driver.java

public class driver extends Configured implements Tool{
     
     public int run(String[] args) throws Exception
       {
        Configuration conf = new Configuration();
              Job job = new Job(conf, "wordcount");
              
              job.setMapperClass(mapper.class);
              job.setReducerClass(reducer.class);
              
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(Text.class);
              job.setInputFormatClass(KeyValueTextInputFormat.class);
              
              FileInputFormat.addInputPath(job, new Path(args[0]));
              FileOutputFormat.setOutputPath(job, new Path(args[1]));
              
              job.waitForCompletion(true);
              //JobClient.runJob((JobConf) conf);
              //System.exit(job.waitForCompletion(true) ? 0 : 1);
             return 0;
       }
     
      public static void main(String[] args) throws Exception
      {
          long start = System.currentTimeMillis();
            //int res = ToolRunner.run(new Configuration(), new driver(),args);
          
           int res = ToolRunner.run(new Configuration(), new driver(),args);
            
            long stop = System.currentTimeMillis();
            System.out.println ("Time: " + (stop-start));
            System.exit(res);
      }
}

mapper.java

public class mapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
      //hadoop supported data types
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
     
      //map method that performs the tokenizer job and framing the initial key value pairs
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
            String line = value.toString();
          StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens())
            {
               word.set(tokenizer.nextToken());
                 output.collect(word, one);
            }
       }
}

reducer.java
public class reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
      //reduce method accepts the Key Value pairs from mappers, do the aggregation based on keys and produce the final out put
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
            int sum = 0;
          while (values.hasNext())
          {
               sum += values.next().get();
          }
          output.collect(key, new IntWritable(sum));
      }
}

回答1:

You are perplexed by the new & old APIs of MapReduce. I think you tried to write WordCount program in new API, but took snippets from the old API(a old blogpost perhaps). You can find the problem yourself, if you just add @override annotation to both the map & reduce methods.

See what happens to them after evolution :

  • map
  • reduce

You just wrote two new methods specifying older signature, so they just don't override anything, nowhere being called. The code is doing nothing since the actual methods being called have empty bodies(I don't think there is a default implementation and if there is that will be identity operations only).

Anyway, you should follow basic conventions for coding.



回答2:

try this,

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class WordCount  {

    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {

            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            System.out.println(line);
            while (tokenizer.hasMoreTokens()) {
                value.set(tokenizer.nextToken());
                output.collect(value, new IntWritable(1));
            }

        }
    }

    public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }

            output.collect(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception,IOException  {

        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path("/home/user17/test.txt"));
        FileOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:9000/out2"));

        JobClient.runJob(conf);

    }
}

make jar and execute given command on commandLine

hadoop jar WordCount.jar WordCount /inputfile /outputfile


回答3:

Please run this code if you are facing problem with your code.This code contains mapper,reducer and main functions.

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;    
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
       String line = value.toString();
       StringTokenizer tokenizer = new StringTokenizer(line);

       while (tokenizer.hasMoreTokens()) {
              word.set(tokenizer.nextToken());
              output.collect(word, one);
       }
   }
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

   public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

       int sum = 0;     
       while (values.hasNext()){
          sum += values.next().get();
       }
      output.collect(key, new IntWritable(sum)); 
    }
}

public static void main(String[] args) throws Exception {
     JobConf conf = new JobConf(WordCount.class);
     conf.setJobName("wordcount");
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);
     conf.setMapperClass(Map.class);
     conf.setCombinerClass(Reduce.class); 
     conf.setReducerClass(Reduce.class);
     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class);

     FileInputFormat.setInputPaths(conf, new Path(args[0])); 
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));

     JobClient.runJob(conf);
  }
}

2) After that create a jar of this code say wordcount.jar saved in your home directory (/home/user/wordcount.jar) and run the following command :

hadoop jar wordcount.jar classname /inputfile /outputfile /

This will create a file outputfile under /(root) directory of hadoop. View your result by

hadoop dfs -cat /outputfile/part-m-00000

This will successfully run your wordcount program.