Spark runs in local but can't find file when r

2019-07-09 00:52发布

问题:

I've been trying to submit a simple python script to run it in a cluster with YARN. When I execute the job in local, there's no problem, everything works fine but when I run it in the cluster it fails.

I executed the submit with the following command:

spark-submit --master yarn --deploy-mode cluster test.py

The log error I'm receiving is the following one:

17/11/07 13:02:48 INFO yarn.Client: Application report for application_1510046813642_0010 (state: ACCEPTED)
17/11/07 13:02:49 INFO yarn.Client: Application report for application_1510046813642_0010 (state: ACCEPTED)
17/11/07 13:02:50 INFO yarn.Client: Application report for application_1510046813642_0010 (state: FAILED)
17/11/07 13:02:50 INFO yarn.Client: 
     client token: N/A
     diagnostics: Application application_1510046813642_0010 failed 2 times due to AM Container for appattempt_1510046813642_0010_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://myserver:8088/proxy/application_1510046813642_0010/Then, click on links to logs of each attempt.
**Diagnostics: File does not exist: hdfs://myserver:8020/user/josholsan/.sparkStaging/application_1510046813642_0010/test.py**
java.io.FileNotFoundException: File does not exist: hdfs://myserver:8020/user/josholsan/.sparkStaging/application_1510046813642_0010/test.py
    at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1266)
    at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1258)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1258)
    at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:251)
    at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
    at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
    at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
    at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
    at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Failing this attempt. Failing the application.
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: root.users.josholsan
     start time: 1510056155796
     final status: FAILED
     tracking URL: http://myserver:8088/cluster/app/application_1510046813642_0010
     user: josholsan
Exception in thread "main" org.apache.spark.SparkException: Application application_1510046813642_0010 finished with failed status
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1025)
    at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1072)
    at org.apache.spark.deploy.yarn.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/11/07 13:02:50 INFO util.ShutdownHookManager: Shutdown hook called
17/11/07 13:02:50 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-5cc8bf5e-216b-4d9e-b66d-9dc01a94e851

I put special attention to this line

Diagnostics: File does not exist: hdfs://myserver:8020/user/josholsan/.sparkStaging/application_1510046813642_0010/test.py

I don't know why it can't finde the test.py, I also tried to put it in HDFS under the directory of the user executing the job: /user/josholsan/

To finish my post I would like to share also my test.py script:

from pyspark import SparkContext

file="/user/josholsan/concepts_copy.csv"
sc = SparkContext("local","Test app")
textFile = sc.textFile(file).cache()

linesWithOMOP=textFile.filter(lambda line: "OMOP" in line).count()
linesWithICD=textFile.filter(lambda line: "ICD" in line).count()

print("Lines with OMOP: %i, lines with ICD9: %i" % (linesWithOMOP,linesWithICD))

Could the error also be in here?:

sc = SparkContext("local","Test app")

Thanks you so much for your help in advance.

回答1:

Transferred from the comments section:

  • sc = SparkContext("local","Test app"): having "local" here will override any command line settings; from the docs:

    Any values specified as flags or in the properties file will be passed on to the application and merged with those specified through SparkConf. Properties set directly on the SparkConf take highest precedence, then flags passed to spark-submit or spark-shell, then options in the spark-defaults.conf file.

  • The test.py file must be placed somewhere where it is visible throughout the whole cluster. E.g. spark-submit --master yarn --deploy-mode cluster http://somewhere/accessible/to/master/and/workers/test.py
  • Any additional files and resources can be specified using the --py-files argument (tested in mesos, not in yarn unfortunately), e.g. --py-files http://somewhere/accessible/to/all/extra_python_code_my_code_uses.zip

    Edit: as @desertnaut commented, this argument should be used before the script to be executed.

  • yarn logs -applicationId <app ID> will give you the output of your submitted job. More here and here

Hope this helps, good luck!