Using files in Hadoop Streaming with Python

2019-07-22 02:51发布

问题:

I am completely new to Hadoop and MapReduce and am trying to work my way through it. I am trying to develop a mapreduce application in python, in which I use data from 2 .CSV files. I am just reading the two files in mapper and then printing the key value pair from the files to the sys.stdout

The program runs fine when I use it on a single machine, but with the Hadoop Streaming, I get an error. I think I am making some mistake in reading files in the mapper on Hadoop. Please help me out with the code, and tell me how to use file-handling in Hadoop Streaming. The mapper.py code is as below. (You can understand the code from the comments):

#!/usr/bin/env python
import sys
from numpy import genfromtxt

def read_input(inVal):
    for line in inVal:
        # split the line into words
        yield line.strip()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    labels=[]
    data=[]    
    incoming = read_input(sys.stdin)
    for vals in incoming:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited;
        if len(vals) > 10:
            data.append(vals)
        else:
            labels.append(vals)

    for i in range(0,len(labels)):
        print "%s%s%s\n" % (labels[i], separator, data[i])


if __name__ == "__main__":
    main()

There are 60000 records which are entered to this mapper from two .csv files as follows (on single machine, not hadoop cluster):

cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py

回答1:

I was able to resolve the issue after searching a solution for like 3 days.

The problem is with the newer version of Hadoop (2.2.0 in my case). The mapper code, when reading values from files was giving an exit code of non-zero at some point (maybe because it was reading a huge list of values(784) at a time). There is a setting in the Hadoop 2.2.0, which tells the Hadoop System to give a general error (subprocess failed with code 1). This setting is set to True by default. I just had to set the value of this property to False, and it made my code run without any errors.

Setting is: stream.non.zero.exit.is.failure. Just set it to false when streaming. So the streaming command would be somewhat like:

**hadoop jar ... -D stream.non.zero.exit.is.failure=false ...**

Hope it helps someone, and saves 3 days... ;)



回答2:

You didn't post your error. In streaming you need to pass the -file argument or a -input , so that the file is either uploaded with your streaming job or knows where to find it on hdfs.