I am getting OOM exception (Java heap space) for reduce child. In the reducer, I am appending all the values to a StringBuilder which would be the output of the reducer process. The number of values aren't that many. I tried to increase the value of mapred.reduce.child.java.opts
to 512M and 1024M but that doesn't help. Reducer code is given below.
StringBuilder adjVertexStr = new StringBuilder();
long itcount= 0;
while(values.hasNext()) {
adjVertexStr.append(values.next().toString()).append(" ");
itcount++;
}
log.info("Size of iterator: " + itcount);
multipleOutputs.getCollector("vertex", reporter).collect(key, new Text(""));
multipleOutputs.getCollector("adjvertex", reporter).collect(adjVertexStr, new Text(""));
I get exceptions at 3 places in the above code.
- In the exception stacktrace, the line number points to the while loop statement which appends strings.
- In the last line - collect() statement.
- I had a set accumulating all the values - so that there are no duplicate values. I removed it later on.
Some sample sizes of iterator are as follows: 238695, 1, 13, 673, 1, 1 etc. These are not very large values. Why do I keep getting the OOM exception? Any help would be valuable to me.
Stack trace
2012-10-10 21:15:03,929 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 238695
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 13
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1
2012-10-10 21:15:04,191 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1
2012-10-10 21:15:04,193 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 673
2012-10-10 21:15:04,195 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1
2012-10-10 21:15:09,856 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs` truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-10-10 21:15:09,916 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2012-10-10 21:15:09,916 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName hduser for UID 2006 from the native implementation
2012-10-10 21:15:09,922 FATAL org.apache.hadoop.mapred.Child: Error running child : java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2882)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:100)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:390)
at java.lang.StringBuilder.append(StringBuilder.java:119)
at partitioning.UndirectedGraphPartitioner$Reduce.reduce(UndirectedGraphPartitioner.java:106)
at partitioning.UndirectedGraphPartitioner$Reduce.reduce(UndirectedGraphPartitioner.java:82)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
So for your example, you want to output the values for a particular key as a space separated list of the values (as the output key), and an empty text as the output value.
Your output format for this would consume the reduce key / values as follows (this would be in your reducer code):
The actual recordWriter would then use the key as a logic trigger:
When a key is passed that is different to the previously passed key, the previous record being written would be closed out (write a tab followed by a newline for example). The previous key would be updated and the new value written out to the output stream.
If the key is the same as the previous key, then output a space followed by the value to the output stream.
In the close method for the record writer, perform the same logic as if a new key was being passed (output a tab, followed by a newline).
Hope this makes sense. The only thing you need to be careful of is if you have a custom group comparator (which will cause the previous key comparison in the record writer to fail). Also remember to make a deep copy of the key when updating the previous key tracking variable.