On what basis mapreduce framework decides whether

2019-01-22 14:48发布

问题:

As per definition "The Combiner may be called 0, 1, or many times on each key between the mapper and reducer."

I want to know that on what basis mapreduce framework decides how many times cobiner will be launched.

回答1:

Simply the number of spills to disk. Sorting happens after the MapOutputBuffer filled up, at the same time the combining will take place.

You can tune the number of spills to disk with the parameters io.sort.mb, io.sort.spill.percent, io.sort.record.percent - those are also explained in the documentation (books and online resources).

Example for specific numbers of combiner runs:

0 -> no combiner was defined

1 -> a combiner was defined and the MapOutputBuffer filled up once

>1 -> a combiner was defined and the MapOutputBuffer filled up more than once

Note that even if the MapOutputBuffer never fills up completely, this buffer must be flushed at the end of the map stage and thus triggers the combiner to run at least once (if defined).



回答2:

First of all, Thomas Jungblut's answer is great and I gave me upvote. The only thing I want to add is that the Combiner will always be run at least once per Mapper if defined, unless the mapper output is empty or is a single pair. So having the combiner not being executed in the mapper is possible but highly unlikely.



回答3:

Source code which has logic to invoke combiner based on condition.

Line 1950 - Line 1955 https://github.com/apache/hadoop/blob/0b8a7c18ddbe73b356b3c9baf4460659ccaee095/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java

 if (combinerRunner == null || numSpills < minSpillsForCombine) {
     Merger.writeFile(kvIter, writer, reporter, job);
 } else {
     combineCollector.setWriter(writer);
     combinerRunner.combine(kvIter, combineCollector);
 }

So Combiner runs if :

It is not defined , and If the spills are greater than minSpillsForCombine. minSpillForCombine is driven by property "mapreduce.map.combine.minspills" whose default value is 3.