Hadoop streaming - remove trailing tab from reduce

2019-02-06 21:13发布

问题:

I have a hadoop streaming job whose output does not contain key/value pairs. You can think of it as value-only pairs or key-only pairs.

My streaming reducer (a php script) is outputting records separated by newlines. Hadoop streaming treats this as a key with no value, and inserts a tab before the newline. This extra tab is unwanted.

How do I remove it?

I am using hadoop 1.0.3 with AWS EMR. I downloaded the source of hadoop 1.0.3 and found this code in hadoop-1.0.3/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java :

reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");

So I tried passing -D stream.reduce.output.field.separator= as an argument to the job with no luck. I also tried -D mapred.textoutputformat.separator= and -D mapreduce.output.textoutputformat.separator= with no luck.

I've searched google of course and nothing I found worked. One search result even stated there was no argument that could be passed to achieve the desired result (though, the hadoop version in that case was really really old).

Here is my code (with added line breaks for readability):

hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
    -D mapred.output.compress=true -D stream.reduce.output.field.separator=
    -input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
    -mapper 'php my_mapper.php' -reducer 'php my_reducer.php'

回答1:

Looking at the org.apache.hadoop.mapreduce.lib.output.TextOutputFormat source, I see 2 things:

  1. The write(key,value) method writes a separator if key or value is non-null
  2. The separator is always set, using the default (\t), when the mapred.textoutputformat.separator returns null (which I'm assuming happens with -D stream.reduce.output.field.separator=

Your only solution maybe to write your own OutputFormat that works around these 2 issues.

My testing

In a task I had, I wanted to reformat a line from

id1|val1|val2|val3
id1|val1

into:

id1|val1,val2,val3
id2|val1

I had a custom mapper (Perl script) to convert the lines. And for this task, I initially tried to do as a key-only (or value-only) input, but got the results with the trailing tab.

At first I just specified:

-D stream.map.input.field.separator='|' -D stream.map.output.field.separator='|'

This gave the mapper a key, value pair, since my mapping wanted a key anyway. But this output now had the tab after the first field

I got the desired output when I added:

-D mapred.textoutputformat.separator='|'

If I didn't set it or set to blank

-D mapred.textoutputformat.separator=

then I would again get a tab after the first field.

It made sense once I looked at the source for TextOutputFormat



回答2:

As helpful for others, using the tips above, I was able to do an implementation:

CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}

with exactly one line of the built-in implementation of 'getRecordWriter' changed to:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); 

instead of:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t"); 

after compiling that into a Jar, and including it into my hadoop streaming call (via the instructions on hadoop streaming), the call looked like:

hadoop   jar  /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar     \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat  \
-file yourMapper.py    -mapper  yourMapper.py     \
-file yourReducer.py   -reducer yourReducer.py    \
-input $yourInputFile    \
-output $yourOutputDirectoryOnHDFS

I also included the jar in the folder I issued that call from.

It was working great for my needs (and it created no tabs at the end of the line after the reducer).


update: based on a comment implying this is indeed helpful for others, here's the full source of my CustomOutputFormat.java file:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {

    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
        Progressable progress) throws IOException {
    boolean isCompressed = getCompressOutput(job);

    //Channging the default from '\t' to blank
    String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
    if (!isCompressed) {
        Path file = FileOutputFormat.getTaskOutputPath(job, name);
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
            GzipCodec.class);
        // create the named codec
        CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
        // build the filename including the extension
        Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(new DataOutputStream(
            codec.createOutputStream(fileOut)), keyValueSeparator);
    }
    }
}

FYI: For your usage context, be sure to check this does not adversely affect hadoop-streaming managed interactions (in terms of separating key vs. value) between your mapper and reducer. To clarify:

  • From my testing -- if you have a 'tab' in every line of your data (with something on each side of it), you can leave the built in defaults as they are: streaming will interpret the first thing before the first tab as your 'key', and all on that row after it as your 'value.' As such, it does not see a 'null value,' and won't append a tab that shows up after your reducer. (You'll see your final outputs sorted on the value of the 'key' that streaming interprets in each row as what it sees as occuring before each tab.)

  • Conversely, if you have no tabs in your data, and you don't override the defaults using the above trick(s), then you'll see the tabs after the run completes, for which the above override becomes a fix.



回答3:

I too had this problem. I was using a python, map-only job that was basically just emitting lines of CSV data. After examining the output, I noted the \t on the end of every line.

 foo,bar,baz\t

What I discovered is the mapper, and the Python stream, are both dealing with key value pairs. If you don't emit the default separator the whole line of CSV data is considered the "key" and the framework, which requires a key and a value, slaps on a \t and a empty value.

Since my data was essentially a CSV string, I set the separator for both stream and mapred output to comma. The framework read everything up to the first comma as the key and everything after the first comma as the value. Then when it wrote out the results to file it wrote key comma value which effectively created the output I was after.

 foo,bar,baz

In my case, I added the below to prevent the framework from adding \t to the end of my csv output...

-D mapred.reduce.tasks=0 \
-D stream.map.output.field.separator=, \
-D mapred.textoutputformat.separator=, \