OOM exception in Hadoop Reduce child

2019-09-09 17:18发布

I am getting OOM exception (Java heap space) for reduce child. In the reducer, I am appending all the values to a StringBuilder which would be the output of the reducer process. The number of values aren't that many. I tried to increase the value of mapred.reduce.child.java.opts to 512M and 1024M but that doesn't help. Reducer code is given below.

            StringBuilder adjVertexStr = new StringBuilder();
        long itcount= 0;
        while(values.hasNext()) {
            adjVertexStr.append(values.next().toString()).append(" ");
            itcount++;
        }
        log.info("Size of iterator: " + itcount);
        multipleOutputs.getCollector("vertex", reporter).collect(key, new Text(""));
        multipleOutputs.getCollector("adjvertex", reporter).collect(adjVertexStr, new Text(""));

I get exceptions at 3 places in the above code.

  1. In the exception stacktrace, the line number points to the while loop statement which appends strings.
  2. In the last line - collect() statement.
  3. I had a set accumulating all the values - so that there are no duplicate values. I removed it later on.

Some sample sizes of iterator are as follows: 238695, 1, 13, 673, 1, 1 etc. These are not very large values. Why do I keep getting the OOM exception? Any help would be valuable to me.

Stack trace

2012-10-10 21:15:03,929 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 238695                                                                                                   
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 13                                                                                                       
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,191 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,193 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 673                                                                                                       
2012-10-10 21:15:04,195 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:09,856 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs`    truncater with mapRetainSize=-1 and reduceRetainSize=-1                                                       
2012-10-10 21:15:09,916 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to  User mapping with a cache timeout of 14400 seconds.                                                     
2012-10-10 21:15:09,916 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName hduser for UID         2006 from the native implementation                                                                      
2012-10-10 21:15:09,922 FATAL org.apache.hadoop.mapred.Child: Error running child :       java.lang.OutOfMemoryError: Java heap space                                                                           
    at java.util.Arrays.copyOf(Arrays.java:2882)                                                                                                                                                      
    at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:100)                                                                                                                 
    at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:390)                                                                                                                         
    at java.lang.StringBuilder.append(StringBuilder.java:119)                                                                                                                                         
    at partitioning.UndirectedGraphPartitioner$Reduce.reduce(UndirectedGraphPartitioner.java:106)                                                                                            
    at partitioning.UndirectedGraphPartitioner$Reduce.reduce(UndirectedGraphPartitioner.java:82)                                                                                             
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519)                                                                                                                         
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)                                                                                                                                   
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)                                                                                                                                           
    at java.security.AccessController.doPrivileged(Native Method)                                                                                                                                     
    at javax.security.auth.Subject.doAs(Subject.java:396)                                                                                                                                             
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)                                                                                                           
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

1条回答
地球回转人心会变
2楼-- · 2019-09-09 18:06

So for your example, you want to output the values for a particular key as a space separated list of the values (as the output key), and an empty text as the output value.

Your output format for this would consume the reduce key / values as follows (this would be in your reducer code):

for (Text value : values) {
    multipleOutputs.getCollector("adjvertex", reporter)
       .collect(key, value);
}

The actual recordWriter would then use the key as a logic trigger:

When a key is passed that is different to the previously passed key, the previous record being written would be closed out (write a tab followed by a newline for example). The previous key would be updated and the new value written out to the output stream.

If the key is the same as the previous key, then output a space followed by the value to the output stream.

In the close method for the record writer, perform the same logic as if a new key was being passed (output a tab, followed by a newline).

Hope this makes sense. The only thing you need to be careful of is if you have a custom group comparator (which will cause the previous key comparison in the record writer to fail). Also remember to make a deep copy of the key when updating the previous key tracking variable.

查看更多
登录 后发表回答