I have a five node cluster, three nodes of which contain DataNodes and TaskTrackers.
I've imported around 10million rows from Oracle via Sqoop and process it via MapReduce in an Oozie workflow.
The MapReduce job takes about 30 minutes and is only using one reducer.
Edit - If I run the MapReduce code on its own, separate from Oozie, the job.setNumReduceTasks(4)
correctly establishes 4 reducers.
I have tried the following methods to manually set the number of reducers to four, with no success:
In Oozie, set the following property in the tag of the map reduce node:
<property><name>mapred.reduce.tasks</name><value>4</value></property>
In the MapReduce java code's Main method:
Configuration conf = new Configuration();
Job job = new Job(conf, "10 million rows");
...
job.setNumReduceTasks(4);
I also tried:
Configuration conf = new Configuration();
Job job = new Job(conf, "10 million rows");
...
conf.set("mapred.reduce.tasks", "4");
My map function looks similar to this:
public void map(Text key, Text value, Context context) {
CustomObj customObj = new CustomObj(key.toString());
context.write(new Text(customObj.getId()), customObj);
}
I think there are something like 80,000 different values for the ID.
My Reduce function looks similar to this:
public void reduce(Text key, Iterable<CustomObj> vals, Context context) {
OtherCustomObj otherCustomObj = new OtherCustomObj();
...
context.write(null, otherCustomObj);
}
The custom object emitted in the Mapper implements WritableComparable, but the other custom objected emitted in the Reducer does not implement WritableComparable.
Here are the logs regarding the System counters, job counters, and map-reduce framework, where it specifies that only one reduce task was launched.
map 100% reduce 100%
Job complete: job_201401131546_0425
Counters: 32
File System Counters
FILE: Number of bytes read=1370377216
FILE: Number of bytes written=2057213222
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=556345690
HDFS: Number of bytes written=166938092
HDFS: Number of read operations=18
HDFS: Number of large read operations=0
HDFS: Number of write operations=1
Job Counters
Launched map tasks=11
Launched reduce tasks=1
Data-local map tasks=11
Total time spent by all maps in occupied slots (ms)=1268296
Total time spent by all reduces in occupied slots (ms)=709774
Total time spent by all maps waiting after reserving slots (ms)=0
Total time spent by all reduces waiting after reserving slots (ms)=0
Map-Reduce Framework
Map input records=9440000
Map output records=9440000
Map output bytes=666308476
Input split bytes=1422
Combine input records=0
Combine output records=0
Reduce input groups=80000
Reduce shuffle bytes=685188530
Reduce input records=9440000
Reduce output records=2612760
Spilled Records=28320000
CPU time spent (ms)=1849500
Physical memory (bytes) snapshot=3581157376
Virtual memory (bytes) snapshot=15008251904
Total committed heap usage (bytes)=2848063488
Edit: I modified the MapReduce to introduce a custom partitioner, a sort comparator, and a grouping comparator. For some reason, the code now launches two reducers (when scheduled via Oozie), but not four.
I set the mapred.tasktracker.map.tasks.maximum
property to 20 on each TaskTracker (and JobTracker), restarted them but no result.