I want to sort the output based on lastname of the user, the key being used is firstName. Following are the classes which i am using but i am not getting sorted output based on lastName. I am new to hadoop,this i wrote using help from various internet sources.
Main Class :-
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, CustomKey, Text> {
public static final Log log = LogFactory.getLog(Map.class);
private final static IntWritable one = new IntWritable(1);
private Text first = new Text();
private Text last = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, "\n");
log.info(" line issssssss " + tokenizer.hasMoreTokens());
while (tokenizer.hasMoreTokens()) {
String[] vals = tokenizer.nextToken().split(" ");
first.set(vals[0]);
last.set(vals[1]);
context.write(new CustomKey(first.toString(), last.toString()), last);
}
}
}
public static class Reduce extends Reducer<CustomKey, Text, Text, Text> {
public static final Log log = LogFactory.getLog(Reduce.class);
@Override
public void reduce(CustomKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
// Text value = new Text();
for (Text val : values) {
context.write(new Text(key.getFirstName()), val);
}
}
}
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(CustomKey.class);
job.setPartitionerClass(CustomKeyPartinioner.class);
job.setGroupingComparatorClass(CustomGroupComparator.class);
job.setSortComparatorClass(CustomKeySorter.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Composite key class :-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
public class CustomKey implements WritableComparable<CustomKey> {
public static final Log log = LogFactory.getLog(CustomKey.class);
private String firstName, lastName;
public CustomKey() {
// TODO Auto-generated constructor stub
}
@Override
public void readFields(DataInput arg0) throws IOException {
this.firstName = WritableUtils.readString(arg0);
this.lastName = WritableUtils.readString(arg0);
}
public String getFirstName() {
return firstName;
}
public CustomKey(String firstName, String lastName) {
super();
this.firstName = firstName;
this.lastName = lastName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public void write(DataOutput arg0) throws IOException {
log.debug("write value is " + firstName);
WritableUtils.writeString(arg0, firstName);
WritableUtils.writeString(arg0, lastName);
}
@Override
public int compareTo(CustomKey o) {
int result = firstName.compareTo(o.getFirstName());
log.debug("value is " + result);
if (result == 0) {
return lastName.compareTo(o.getLastName());
}
return result;
}
@Override
public String toString() {
return (new StringBuilder()).append(firstName).append(',').append(lastName).toString();
}
}
Partitioner Class :-
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class CustomKeyPartinioner extends Partitioner<CustomKey, Text> {
HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>();
Text newKey = new Text();
@Override
public int getPartition(CustomKey arg0, Text arg1, int arg2) {
try {
newKey.set(arg0.getFirstName());
return hashPartitioner.getPartition(newKey, arg1, arg2);
} catch (Exception e) {
e.printStackTrace();
return (int) (Math.random() * arg2);
}
}
}
GroupComparator Class :-
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class CustomGroupComparator extends WritableComparator {
protected CustomGroupComparator() {
super(CustomKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CustomKey key1 = (CustomKey) w1;
CustomKey key2 = (CustomKey) w2;
// (check on udid)
return key1.getFirstName().compareTo(key2.getFirstName());
}
}
Custom Key Sorter Class :-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class CustomKeySorter extends WritableComparator {
public static final Log log = LogFactory.getLog(CustomKeySorter.class);
protected CustomKeySorter() {
super(CustomKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CustomKey key1 = (CustomKey) w1;
CustomKey key2 = (CustomKey) w2;
int value = key1.getFirstName().compareTo(key2.getFirstName());
log.debug("value is " + value);
if (value == 0) {
return -key1.getLastName().compareTo(key2.getLastName());
}
return value;
}
}