Setting the Number of Reducers in a MapReduce job

2019-05-18 13:46发布

问题:

I have a five node cluster, three nodes of which contain DataNodes and TaskTrackers.

I've imported around 10million rows from Oracle via Sqoop and process it via MapReduce in an Oozie workflow.

The MapReduce job takes about 30 minutes and is only using one reducer.

Edit - If I run the MapReduce code on its own, separate from Oozie, the job.setNumReduceTasks(4) correctly establishes 4 reducers.

I have tried the following methods to manually set the number of reducers to four, with no success:

In Oozie, set the following property in the tag of the map reduce node:

<property><name>mapred.reduce.tasks</name><value>4</value></property>

In the MapReduce java code's Main method:

Configuration conf = new Configuration();
Job job = new Job(conf, "10 million rows");
...
job.setNumReduceTasks(4);

I also tried:

Configuration conf = new Configuration();
Job job = new Job(conf, "10 million rows");
...
conf.set("mapred.reduce.tasks", "4");

My map function looks similar to this:

public void map(Text key, Text value, Context context) {
    CustomObj customObj = new CustomObj(key.toString());
    context.write(new Text(customObj.getId()), customObj);  
}

I think there are something like 80,000 different values for the ID.

My Reduce function looks similar to this:

public void reduce(Text key, Iterable<CustomObj> vals, Context context) {
    OtherCustomObj otherCustomObj = new OtherCustomObj();
    ...
    context.write(null, otherCustomObj);
}

The custom object emitted in the Mapper implements WritableComparable, but the other custom objected emitted in the Reducer does not implement WritableComparable.

Here are the logs regarding the System counters, job counters, and map-reduce framework, where it specifies that only one reduce task was launched.

 map 100% reduce 100%
 Job complete: job_201401131546_0425
 Counters: 32
   File System Counters
     FILE: Number of bytes read=1370377216
     FILE: Number of bytes written=2057213222
     FILE: Number of read operations=0
     FILE: Number of large read operations=0
     FILE: Number of write operations=0
     HDFS: Number of bytes read=556345690
     HDFS: Number of bytes written=166938092
     HDFS: Number of read operations=18
     HDFS: Number of large read operations=0
     HDFS: Number of write operations=1
   Job Counters 
     Launched map tasks=11
     Launched reduce tasks=1
     Data-local map tasks=11
     Total time spent by all maps in occupied slots (ms)=1268296
     Total time spent by all reduces in occupied slots (ms)=709774
     Total time spent by all maps waiting after reserving slots (ms)=0
     Total time spent by all reduces waiting after reserving slots (ms)=0
   Map-Reduce Framework
     Map input records=9440000
     Map output records=9440000
     Map output bytes=666308476
     Input split bytes=1422
     Combine input records=0
     Combine output records=0
     Reduce input groups=80000
     Reduce shuffle bytes=685188530
     Reduce input records=9440000
     Reduce output records=2612760
     Spilled Records=28320000
     CPU time spent (ms)=1849500
     Physical memory (bytes) snapshot=3581157376
     Virtual memory (bytes) snapshot=15008251904
     Total committed heap usage (bytes)=2848063488

Edit: I modified the MapReduce to introduce a custom partitioner, a sort comparator, and a grouping comparator. For some reason, the code now launches two reducers (when scheduled via Oozie), but not four.

I set the mapred.tasktracker.map.tasks.maximum property to 20 on each TaskTracker (and JobTracker), restarted them but no result.

回答1:

Just as a starting point what is the value of the following property in the mapred-site.xml

<property>
    <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>4</value>
</property>