I'm new to Hadoop and I've managed just to run the wordCount example: http://hadoop.apache.org/common/docs/r0.18.2/mapred_tutorial.html
Suppose we have a folder which has 3 files. I want to have one mapper for each file, and this mapper will just count the number of lines and return it to the reducer.
The reducer will then take as an input the number of lines from each mapper, and give as an output the total number of lines that exist in all 3 files.
So if we have the following 3 files
input1.txt
input2.txt
input3.txt
and the mappers return:
mapper1 -> [input1.txt, 3]
mapper2 -> [input2.txt, 4]
mapper3 -> [input3.txt, 9]
the reducer will give an output of
3+4+9 = 16
I have done this in a simple java application so I would like to do it in Hadoop. I have just 1 computer and would like to try running on a pseudo distributed environment.
How can I achieve this thing? What proper steps should I make?
Should my code look like that in the example by apache? I will have two static classes, one for mapper one for reducer? or should I have 3 classes, one for each mapper?
if you can please guide me through this, I have no idea how to do this and I believe if I manage to write some code that does this stuff then I will be able to write more complex application in the future.
Thanks!
In addition to sa125's answer, you can hugely improve performance by not emitting a record for each input record, but rather just accumulate a counter in the mapper, and then in the mapper clean-up method, emit the filename and count value:
public class LineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
protected long lines = 0;
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
String filename = split.getPath().toString();
context.write(new Text(filename), new LongWritable(lines));
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
lines++;
}
}
I've noticed you use the docs from 0.18 version. Here's a link to 1.0.2 (latest).
First advice - use an IDE (eclipse, IDEA, etc.). It'll really help out with filling the blanks.
In actual HDFS, you can't know where each piece of a file resides (different machines and clusters). There's no guarentee that row X will even reside on the same disk as row Y. There's also no guarantee that row X won't be split across different machines (HDFS distributes data in blocks, typically 64Mb each). This means that you can't assume the same mapper will handle an entire file. What you can make sure is that each file is handled by the same reducer.
Since a reducer is unique per key sent from the mapper, the way I'd go about doing this is using the filename as my output key in the mapper. In addition, the default input class for a mapper is TextInputFormat
, this means each mapper will receive an entire line on its own (terminated by LF or CR). You can then emit the filename and the number 1 (or whatever, it's irrelevant to the calculation) from your mapper. Then, in the reducer, you simply use a loop to count how many times the filename was received:
in the mapper's map function
public static class Map extends Mapper<IntWritable, Text, Text, Text> {
public void map(IntWritable key, Text value, Context context) {
// get the filename
InputSplit split = context.getInputSplit();
String fileName = split.getPath().getName();
// send the filename to the reducer, the value
// has no meaning (I just put "1" to have something)
context.write( new Text(fileName), new Text("1") );
}
}
in the reducer's reduce function
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text fileName, Iterator<Text> values, Context context) {
long rowcount = 0;
// values get one entry for each row, so the actual value doesn't matter
// (you can also get the size, I'm just lazy here)
for (Text val : values) {
rowCount += 1;
}
// fileName is the Text key received (no need to create a new object)
context.write( fileName, new Text( String.valueOf( rowCount ) ) );
}
}
in the driver/main
You can pretty much use the same driver as the wordcount example - note that I used the new mapreduce API, so you'll need to adjust some things (Job
instead of JobConf
, etc). This was really helpful when I read up on it.
Note that your MR output will be just each filename and the rowcount for it:
input1.txt 3
input2.txt 4
input3.txt 9
If you just want to count the TOTAL number of lines in all the files, simply emit the same key in all the mappers (not the filename). This way there will be only one reducer to handle all the row counting:
// no need for filename
context.write( new Text("blah"), new Text("1") );
You can also chain a job that'll process the output of the per-file rowcount, or do other fancy stuff - that's up to you.
I left some boilerplate code out, but the basics are there. Be sure to check up on me, since I was typing most of this from memory.. :)
Hope this helps!