New Spark user here. I'm extracting features from many .tif images stored on AWS S3, each with identifier like 02_R4_C7. I'm using Spark 2.2.1 and hadoop 2.7.2.
I'm using all default configurations like so:
conf = SparkConf().setAppName("Feature Extraction")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
And here is the function call that this fails on after some features are successfully saved in an image id folder as part-xxxx.gz files:
features_labels_rdd.saveAsTextFile(text_rdd_direct,"org.apache.hadoop.io.compress.GzipCodec")
See error below. When I delete the feature part-xxxx.gz files that were successfully created and rerun the script, it fails at a different image and part-xxxxx.gz in a seemingly nondeterminsitic way. I make sure to remove all features before rerunning. My theory is that two workers are trying to create the same temp file and are conflicting with each other, since there are two identical error messages for the same file, but one second apart.
I'm at a loss about what to do about this, I've seen that spark lists configurations that can change how spark handles tasks but I'm not sure what would help here since I don't understand the issue I'm having. Any help is greatly appreciated!
SLF4J: Class path contains multiple SLF4J bindings.
*SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-
log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:24:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:24:41 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512
Feature file of 02_R4_C7 is created
[Stage 3:=================> (6 + 14) / 20]18/06/26 19:24:58 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626192453_0003_m_000007_59
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Stage 3:=====================================> (13 + 7) / 20]18/06/26 19:24:58 ERROR executor.Executor: Exception in task 7.0 in stage 3.0 (TID 59)
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/06/26 19:24:58 ERROR scheduler.TaskSetManager: Task 7 in stage 3.0 failed 1 times; aborting job
Traceback (most recent call last):
File "run_feature_extraction_spark.py", line 88, in <module>
main(sc)
File "run_feature_extraction_spark.py", line 75, in main
features_labels_rdd.saveAsTextFile(text_rdd_direct, "org.apache.hadoop.io.compress.GzipCodec")
File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/rdd.py", line 1551, in saveAsTextFile
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o76.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 3.0 failed 1 times, most recent failure: Lost task 7.0 in stage 3.0 (TID 59, localhost, executor driver): java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz*
And when I run it again, the script makes it farther but fails with the same error with a different image folder and part-xxxx.gz file
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:37:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:37:24 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512
Feature file of 02_R4_C7 is created
Feature file of 02_R4_C6 is created
Feature file of 02_R4_C5 is created
Feature file of 02_R4_C4 is created
Feature file of 02_R4_C3 is created
Feature file of 02_R4_C2 is created
Feature file of 02_R4_C1 is created
[Stage 15:==========================================> (15 + 5) / 20]18/06/26 19:38:16 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626193811_0015_m_000017_285
java.io.IOException: Failed to rename FileStatus{path=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/_temporary/0/_temporary/attempt_20180626193811_0015_m_000017_285/part-00017.gz; isDirectory=false; length=896020; replication=1; blocksize=67108864; modification_time=1530041897000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/part-00017.gz