Python hadoop on windows cmd, one mapper and multi

2019-08-28 21:36发布

问题:

I want to execute python file which is related to machine learning and as you know there are two files as inputs (train and test) which are important to make learning process. Also I have no reduce file.

I have three doubts to run my command:

  1. Using two input files, I used -input file1 -input file2 according to Using multiple mapper inputs in one streaming job on hadoop?
  2. Turn off reduce, I used -D mapred.reduce.tasks=0 according to How to write 'map only' hadoop jobs?
  3. how to make flush my "sys.stdin" when I want to read next input? Indeed I used sys.stdout.flush() in my python code.

This is my command on cmd windows:

D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\hadoop-streaming-2.3.0.jar 
-D mapred.reduce.tasks=0
-file /in/Deeplearning.py -mapper "python Deeplearning.py" 
-input /in/train.csv -input /in/test.csv 
-output /output

I use: Windows 10 64bit Python 3.6 and my IDE is spyder 3.2.6, Hadoop 2.7.2, java jdk1.8.0_161

This is my python code:

import sys
import numpy as np
from numpy.random import permutation
import pandas as pd
import tflearn
from tflearn.layers.core import input_data,dropout,fully_connected
from tflearn.layers.conv import conv_2d
from tflearn.layers.normalization import local_response_normalization
from tflearn.layers.estimator import regression
from sklearn.cross_validation import train_test_split


def split_matrices_into_random_train_test_subsets():
    train = pd.read_csv(sys.stdin)
    train = np.array(train)
    train = permutation(train)
    X = train[:,1:785].astype(np.float32) 
    y = train[:,0].astype(np.float32)
    return train_test_split(X, y, test_size=0.33, random_state=42)

def reshape_data(Data,Labels):
    Data = Data.reshape(-1,28,28,1).astype(np.float32)
    Labels = (np.arange(10) == Labels[:,None]).astype(np.float32)
    return Data,Labels

X_train, X_test, y_train, y_test = split_matrices_into_random_train_test_subsets()

X_train,y_train = reshape_data(X_train,y_train)
X_test,y_test = reshape_data(X_test,y_test)
sys.stdout.flush() 
test_x = np.array(pd.read_csv(sys.stdin))
test_x = test_x.reshape(-1,28,28,1)

def Convolutional_neural_network():
    network  = input_data(shape=[None,28,28,1],name='input_layer')
    network  = conv_2d(network, nb_filter=6,  filter_size=6, strides=1, activation='relu', regularizer='L2')  
    network  = local_response_normalization(network)
    network  = conv_2d(network, nb_filter=12, filter_size=5, strides=2, activation='relu', regularizer='L2') 
    network  = local_response_normalization(network)
    network  = conv_2d(network, nb_filter=24, filter_size=4, strides=2, activation='relu', regularizer='L2')
    network  = local_response_normalization(network)    

    network = fully_connected(network, 128, activation='tanh')
    network = dropout(network, 0.8)
    network = fully_connected(network, 256, activation='tanh')
    network = dropout(network, 0.8) 
    network = fully_connected(network, 10, activation='softmax') 

    sgd   = tflearn.SGD(learning_rate=0.1,lr_decay=0.096,decay_step=100)
    top_k = tflearn.metrics.top_k(3) 

    network = regression(network, optimizer=sgd, metric=top_k, loss='categorical_crossentropy')
    return tflearn.DNN(network, tensorboard_dir='tf_CNN_board', tensorboard_verbose=3)

model = Convolutional_neural_network()
model.fit(X_train, y_train, batch_size=128, validation_set=(X_test,y_test), n_epoch=1, show_metric=True)

P = model.predict(test_x)

index = [i for i in range(1,len(P)+1)]
result = []
for i in range(len(P)):
    result.append(np.argmax(P[i]).astype(np.int))

res = pd.DataFrame({'ImageId':index,'Label':result})
print('%s\t' % (res))

My python code works properly, I just want to run it on Hadoop ecosystem When I ran command, I got it:

D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\hadoop-streaming-2.3.0.jar -D mapred.reduce.tasks=0 -file /in/Deeplearning.py -mapper "python Deeplearning.py" -input /in/train.csv -input /in/test.csv -output /output12
18/02/24 22:09:37 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/in/Deeplearning.py, /D:/tmp/hadoop-Mahsa/hadoop-unjar6766109988740119098/] [] C:\Users\Mahsa\AppData\Local\Temp\streamjob6727979790632634187.jar tmpDir=null
18/02/24 22:09:39 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/02/24 22:09:39 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/02/24 22:09:41 INFO mapred.FileInputFormat: Total input paths to process : 2
18/02/24 22:09:41 INFO mapreduce.JobSubmitter: number of splits:3
18/02/24 22:09:41 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
18/02/24 22:09:42 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1519494353792_0004
18/02/24 22:09:42 INFO impl.YarnClientImpl: Submitted application application_1519494353792_0004
18/02/24 22:09:42 INFO mapreduce.Job: The url to track the job: http://Mahsa:8088/proxy/application_1519494353792_0004/
18/02/24 22:09:42 INFO mapreduce.Job: Running job: job_1519494353792_0004
18/02/24 22:09:55 INFO mapreduce.Job: Job job_1519494353792_0004 running in uber mode : false
18/02/24 22:09:55 INFO mapreduce.Job:  map 0% reduce 0%
18/02/24 22:11:09 INFO mapreduce.Job:  map 75% reduce 0%
18/02/24 22:11:09 INFO mapreduce.Job: Task Id : attempt_1519494353792_0004_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

18/02/24 22:11:10 INFO mapreduce.Job:  map 42% reduce 0%
18/02/24 22:11:12 INFO mapreduce.Job:  map 64% reduce 0%
18/02/24 22:11:15 INFO mapreduce.Job:  map 67% reduce 0%
18/02/24 22:11:24 INFO mapreduce.Job: Task Id : attempt_1519494353792_0004_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

18/02/24 22:11:25 INFO mapreduce.Job:  map 33% reduce 0%
18/02/24 22:11:28 INFO mapreduce.Job: Task Id : attempt_1519494353792_0004_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

18/02/24 22:11:29 INFO mapreduce.Job:  map 0% reduce 0%
18/02/24 22:12:38 INFO mapreduce.Job:  map 21% reduce 0%
18/02/24 22:12:39 INFO mapreduce.Job:  map 74% reduce 0%
18/02/24 22:12:39 INFO mapreduce.Job: Task Id : attempt_1519494353792_0004_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

18/02/24 22:12:40 INFO mapreduce.Job:  map 41% reduce 0%
18/02/24 22:12:41 INFO mapreduce.Job:  map 53% reduce 0%
18/02/24 22:12:42 INFO mapreduce.Job:  map 65% reduce 0%
18/02/24 22:12:47 INFO mapreduce.Job:  map 67% reduce 0%
18/02/24 22:12:54 INFO mapreduce.Job: Task Id : attempt_1519494353792_0004_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

18/02/24 22:12:55 INFO mapreduce.Job:  map 33% reduce 0%
18/02/24 22:12:56 INFO mapreduce.Job:  map 34% reduce 0%
18/02/24 22:12:57 INFO mapreduce.Job: Task Id : attempt_1519494353792_0004_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

18/02/24 22:12:58 INFO mapreduce.Job:  map 0% reduce 0%
18/02/24 22:14:11 INFO mapreduce.Job:  map 29% reduce 0%
18/02/24 22:14:12 INFO mapreduce.Job:  map 62% reduce 0%
18/02/24 22:14:12 INFO mapreduce.Job: Task Id : attempt_1519494353792_0004_m_000002_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

18/02/24 22:14:13 INFO mapreduce.Job:  map 29% reduce 0%
18/02/24 22:14:14 INFO mapreduce.Job:  map 56% reduce 0%
18/02/24 22:14:18 INFO mapreduce.Job:  map 67% reduce 0%
18/02/24 22:14:25 INFO mapreduce.Job: Task Id : attempt_1519494353792_0004_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

18/02/24 22:14:26 INFO mapreduce.Job:  map 33% reduce 0%
18/02/24 22:14:31 INFO mapreduce.Job:  map 34% reduce 0%
18/02/24 22:14:31 INFO mapreduce.Job: Task Id : attempt_1519494353792_0004_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

18/02/24 22:14:32 INFO mapreduce.Job:  map 0% reduce 0%
18/02/24 22:14:46 INFO mapreduce.Job:  map 33% reduce 0%
18/02/24 22:14:47 INFO mapreduce.Job:  map 100% reduce 0%
18/02/24 22:15:02 INFO mapreduce.Job: Job job_1519494353792_0004 failed with state FAILED due to: Task failed task_1519494353792_0004_m_000002
Job failed as tasks failed. failedMaps:1 failedReduces:0

18/02/24 22:15:02 INFO mapreduce.Job: Counters: 13
        Job Counters
                Failed map tasks=10
                Killed map tasks=2
                Launched map tasks=12
                Other local map tasks=9
                Data-local map tasks=3
                Total time spent by all maps in occupied slots (ms)=857100
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=857100
                Total vcore-seconds taken by all map tasks=857100
                Total megabyte-seconds taken by all map tasks=877670400
        Map-Reduce Framework
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
18/02/24 22:15:02 ERROR streaming.StreamJob: Job not Successful!
Streaming Command Failed!

Thank you in advanced for your help or any idea?