The output I am expecting is the count of every word in the input file. But my output is the whole input file, as it is.
I am using extends Mapper<LongWritable, Text, Text, IntWritable>
for mapper class and Reducer<Text, IntWritable, Text, IntWritable>
for reducer class.
Here is my code
driver.java
public class driver extends Configured implements Tool{
public int run(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setMapperClass(mapper.class);
job.setReducerClass(reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
//JobClient.runJob((JobConf) conf);
//System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
public static void main(String[] args) throws Exception
{
long start = System.currentTimeMillis();
//int res = ToolRunner.run(new Configuration(), new driver(),args);
int res = ToolRunner.run(new Configuration(), new driver(),args);
long stop = System.currentTimeMillis();
System.out.println ("Time: " + (stop-start));
System.exit(res);
}
}
mapper.java
public class mapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
//hadoop supported data types
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//map method that performs the tokenizer job and framing the initial key value pairs
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
{
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
reducer.java
public class reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
//reduce method accepts the Key Value pairs from mappers, do the aggregation based on keys and produce the final out put
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
int sum = 0;
while (values.hasNext())
{
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
You are perplexed by the new & old APIs of MapReduce. I think you tried to write WordCount program in new API, but took snippets from the old API(a old blogpost perhaps). You can find the problem yourself, if you just add @override
annotation to both the map & reduce methods.
See what happens to them after evolution :
You just wrote two new methods specifying older signature, so they just don't override anything, nowhere being called. The code is doing nothing since the actual methods being called have empty bodies(I don't think there is a default implementation and if there is that will be identity operations only).
Anyway, you should follow basic conventions for coding.
try this,
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
System.out.println(line);
while (tokenizer.hasMoreTokens()) {
value.set(tokenizer.nextToken());
output.collect(value, new IntWritable(1));
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception,IOException {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("WordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("/home/user17/test.txt"));
FileOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:9000/out2"));
JobClient.runJob(conf);
}
}
make jar and execute given command on commandLine
hadoop jar WordCount.jar WordCount /inputfile /outputfile
Please run this code if you are facing problem with your code.This code contains mapper,reducer and main functions.
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()){
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
2) After that create a jar of this code say wordcount.jar saved in your home directory (/home/user/wordcount.jar) and run the following command :
hadoop jar wordcount.jar classname /inputfile /outputfile /
This will create a file outputfile under /(root) directory of hadoop. View your result by
hadoop dfs -cat /outputfile/part-m-00000
This will successfully run your wordcount program.