Hadoop M/R secondary sort not working, bases on la

2019-08-15 07:47发布

问题:

I want to sort the output based on lastname of the user, the key being used is firstName. Following are the classes which i am using but i am not getting sorted output based on lastName. I am new to hadoop,this i wrote using help from various internet sources.

Main Class :-

public class WordCount {

    public static class Map extends Mapper<LongWritable, Text, CustomKey, Text> {

        public static final Log log = LogFactory.getLog(Map.class);

        private final static IntWritable one = new IntWritable(1);
        private Text first = new Text();
        private Text last = new Text();

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();

            StringTokenizer tokenizer = new StringTokenizer(line, "\n");

            log.info(" line issssssss " + tokenizer.hasMoreTokens());
            while (tokenizer.hasMoreTokens()) {

                String[] vals = tokenizer.nextToken().split(" ");

                first.set(vals[0]);
                last.set(vals[1]);

                context.write(new CustomKey(first.toString(), last.toString()), last);

            }
        }
    }

    public static class Reduce extends Reducer<CustomKey, Text, Text, Text> {

        public static final Log log = LogFactory.getLog(Reduce.class);

        @Override
        public void reduce(CustomKey key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;

            // Text value = new Text();

                for (Text val : values) {
                context.write(new Text(key.getFirstName()), val);
            }

        }
    }

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WordCount.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapOutputKeyClass(CustomKey.class);
        job.setPartitionerClass(CustomKeyPartinioner.class);
        job.setGroupingComparatorClass(CustomGroupComparator.class);
        job.setSortComparatorClass(CustomKeySorter.class);


          job.setInputFormatClass(TextInputFormat.class);
          job.setOutputFormatClass(TextOutputFormat.class);


        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }

}

Composite key class :- 

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

public class CustomKey implements WritableComparable<CustomKey> {


    public static final Log log = LogFactory.getLog(CustomKey.class);

    private String firstName, lastName;

     public CustomKey() {
        // TODO Auto-generated constructor stub
    }

    @Override
    public void readFields(DataInput arg0) throws IOException {

        this.firstName = WritableUtils.readString(arg0);
        this.lastName = WritableUtils.readString(arg0);
    }

    public String getFirstName() {
        return firstName;
    }

    public CustomKey(String firstName, String lastName) {
        super();
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public void write(DataOutput arg0) throws IOException {

        log.debug("write value is " + firstName);

        WritableUtils.writeString(arg0, firstName);
        WritableUtils.writeString(arg0, lastName);
    }





    @Override
    public int compareTo(CustomKey o) {


        int result = firstName.compareTo(o.getFirstName());

        log.debug("value is " + result);


        if (result == 0) {
            return lastName.compareTo(o.getLastName());
        }

        return result;
    }

    @Override
    public String toString() {

    return (new StringBuilder()).append(firstName).append(',').append(lastName).toString();
    }

}


Partitioner Class :- 

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class CustomKeyPartinioner extends Partitioner<CustomKey, Text> {

    HashPartitioner<Text, Text> hashPartitioner = new HashPartitioner<Text, Text>();
    Text newKey = new Text();

    @Override
    public int getPartition(CustomKey arg0, Text arg1, int arg2) {

        try {
            newKey.set(arg0.getFirstName());
            return hashPartitioner.getPartition(newKey, arg1, arg2);
        } catch (Exception e) {

            e.printStackTrace();
            return (int) (Math.random() * arg2);
        }
    }

}


GroupComparator Class :- 

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CustomGroupComparator extends WritableComparator {

    protected CustomGroupComparator() {

        super(CustomKey.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {

        CustomKey key1 = (CustomKey) w1;
        CustomKey key2 = (CustomKey) w2;

    // (check on udid)
        return key1.getFirstName().compareTo(key2.getFirstName());
    }
}


Custom Key Sorter Class :- 

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CustomKeySorter extends WritableComparator {

    public static final Log log = LogFactory.getLog(CustomKeySorter.class);

    protected CustomKeySorter() {

        super(CustomKey.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {

        CustomKey key1 = (CustomKey) w1;
        CustomKey key2 = (CustomKey) w2;

        int value = key1.getFirstName().compareTo(key2.getFirstName());



        log.debug("value is " + value);

        if (value == 0) {

            return -key1.getLastName().compareTo(key2.getLastName());

        }
        return value;
    }
}