combiner and reducer can be different?

2019-02-06 14:26发布

In many MapReduce programs, I see a reducer being used as a combiner as well. I know this is because of the specific nature of those programs. But I am wondering if they can be different.

4条回答
smile是对你的礼貌
2楼-- · 2019-02-06 14:46

Yeah they surely can be different, but I don't think you want to use a different class as mostly you will get unexpected result.

Combiners can only be used on the functions that are commutative(a.b = b.a) and associative {a.(b.c) = (a.b).c} . This also means that combiners may operate only on a subset of your keys and values or may not execute at all, still you want the output of the program to remain same.

Choosing a different class with different logic may not give you a logical output.

查看更多
Lonely孤独者°
3楼-- · 2019-02-06 15:01

Here is the implementation , you can run without combiner and with combiner , both gives exactly same answer . Here Reducer and Combiner has different motive and different implementation.

package combiner;

import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Map extends Mapper<LongWritable, Text, Text, Average> {

Text name = new Text();
String[] row;

protected void map(LongWritable offSet, Text line, Context context) throws IOException, InterruptedException {
    row = line.toString().split(" ");
    System.out.println("Key "+row[0]+"Value "+row[1]);
    name.set(row[0]);
    context.write(name, new Average(Integer.parseInt(row[1].toString()), 1));
}}

Reduce Class

public class Reduce extends Reducer<Text, Average, Text, LongWritable> {
    LongWritable avg =new LongWritable();
    protected void reduce(Text key, Iterable<Average> val, Context context)throws IOException, InterruptedException {
    int total=0; int count=0; long avgg=0;

    for (Average value : val){
        total+=value.number*value.count;
        count+=value.count;
        avgg=total/count;   
        }
    avg.set(avgg);
    context.write(key, avg);
}
}

MapObject Class

public class Average implements Writable {

long number;
int count;

public Average() {super();}

public Average(long number, int count) {
    this.number = number;
    this.count = count;
}

public long getNumber() {return number;}
public void setNumber(long number) {this.number = number;}
public int getCount() {return count;}
public void setCount(int count) {this.count = count;}

@Override
public void readFields(DataInput dataInput) throws IOException {
    number = WritableUtils.readVLong(dataInput);
    count = WritableUtils.readVInt(dataInput);      
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    WritableUtils.writeVLong(dataOutput, number);
    WritableUtils.writeVInt(dataOutput, count);

}
}

Combiner Class

public class Combine extends Reducer<Text, Average, Text, Average>{

protected void reduce(Text name, Iterable<Average> val, Context context)throws IOException, InterruptedException {
    int total=0; int count=0; long avg=0;

    for (Average value : val){
        total+=value.number;
        count+=1;
        avg=total/count;    
        }
    context.write(name, new Average(avg, count));

}
}

Driver Class

public class Driver1 {

public static void main(String[] args) throws Exception { 

    Configuration conf = new Configuration();
    if (args.length != 2) {
        System.err.println("Usage: SecondarySort <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "CustomCobiner");
    job.setJarByClass(Driver1.class);
    job.setMapperClass(Map.class);
    job.setCombinerClass(Combine.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Average.class);
    job.setReducerClass(Reduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);     
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Git the code from here

Leave ur suggestions..

查看更多
可以哭但决不认输i
4楼-- · 2019-02-06 15:05

The primary goal of combiners is to optimize/minimize the number of key value pairs that will be shuffled across the network between mappers and reducers and thus to save as most bandwidth as possible.

The thumb rule of combiner is it has to have the same input and output variable types, the reason for this, is combiner use is not guaranteed, it can or can not be used , depending the volume and number of spills.

The reducer can be used as a combiner when it satisfies this rule i.e. same input and output variable type.

The other most important rule for combiner is it can only be used when the function you want to apply is both commutative and associative. like adding numbers .But not in case like average(if u r using same code as reducer).

Now to answer your question, yes off course they can be different, and when your reducer has different type of input , and output variables, u have no choice , but to make a different copy of ur reducer code and modifying it.

If u r concerned about the logic of the reducer , that you can implement in a different way as well, say in case of a combiner you can have a collection object to have a local buffer of all the values coming to the combiner, this is less risky than using it in reducer, because in case of reducer , it is more prone to go out of memory than in combiner. other logic differences can certainly exist and does.

查看更多
在下西门庆
5楼-- · 2019-02-06 15:07

Yes, a combiner can be different to the Reducer, although your Combiner will still be implementing the Reducer interface. Combiners can only be used in specific cases which are going to be job dependent. The Combiner will operate like a Reducer, but only on the subset of the Key/Values output from each Mapper.

One constraint that your Combiner will have, unlike a Reducer, is that the input/output key and value types must match the output types of your Mapper.

查看更多
登录 后发表回答