Getting error in Spark: Executor lost

2019-04-13 18:55发布

问题:

I have one master and two slaves each running on 32 GB of RAM and I'm reading a csv file with around 18 million records (the first row are the headers for the columns).

This is the command I am using to run the job

./spark-submit --master yarn --deploy-mode client --executor-memory 10g <path/to/.py file>

I did the following

rdd = sc.textFile("<path/to/file>")
h = rdd.first()
header_rdd = rdd.map(lambda l: h in l)
data_rdd = rdd.subtract(header_rdd)
data_rdd.first()

I'm getting the following error message -

15/10/12 13:52:03 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: 192.168.1.114:51058
15/10/12 13:52:03 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: 192.168.1.114:51058
15/10/12 13:52:03 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@192.168.1.114:51058] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
15/10/12 13:52:03 ERROR cluster.YarnScheduler: Lost executor 1 on hslave2: remote Rpc client disassociated
15/10/12 13:52:03 INFO scheduler.TaskSetManager: Re-queueing tasks for 1 from TaskSet 3.0
15/10/12 13:52:03 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@hslave2:58555] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
15/10/12 13:52:03 WARN scheduler.TaskSetManager: Lost task 6.6 in stage 3.0 (TID 208, hslave2): ExecutorLostFailure (executor 1 lost)

This error was coming up when the rdd.subtract() was running. Then, I modified the code and removed the rdd.subtract() and replaced it with a rdd.filter()

Modified code ->

rdd = sc.textFile("<path/to/file>")
h = rdd.first()
data_rdd = rdd.filter(lambda l: h not in l)

But I got the same error.

Does anyone know what are the reasons behind the executor getting lost?

Is it because of inadequate memory in the machines running the cluster?

回答1:

This isn't a Spark bug per-se, but is probably related to the settings you have for Java, Yarn, and your Spark-config file.

see http://apache-spark-user-list.1001560.n3.nabble.com/Executor-Lost-Failure-td18486.html

You'll want to increase your Java memory, increase you akka framesize, increase the akka timeout settings, etc.

Try the following spark.conf:

spark.master                       yarn-cluster
spark.yarn.historyServer.address   <your cluster url>
spark.eventLog.enabled             true
spark.eventLog.dir                 hdfs://<your history directory>
spark.driver.extraJavaOptions      -Xmx20480m -XX:MaxPermSize=2048m -XX:ReservedCodeCacheSize=2048m
spark.checkpointDir                hdfs://<your checkpoint directory>
yarn.log-aggregation-enable        true
spark.shuffle.service.enabled      true
spark.shuffle.service.port         7337
spark.shuffle.consolidateFiles     true
spark.sql.parquet.binaryAsString   true
spark.speculation                  false
spark.yarn.maxAppAttempts          1
spark.akka.askTimeout              1000
spark.akka.timeout                 1000
spark.akka.frameSize               1000
spark.rdd.compress true
spark.storage.memoryFraction 1
spark.core.connection.ack.wait.timeout 600
spark.driver.maxResultSize         0
spark.task.maxFailures             20
spark.shuffle.io.maxRetries        20

You might also want to play around with how many partitions you are requesting inside you Spark program, and you may want to add some partitionBy(partitioner) statements to your RDDs, so your code might be this:

myPartitioner = new HashPartitioner(<your number of partitions>)

rdd = sc.textFile("<path/to/file>").partitionBy(myPartitioner)
h = rdd.first()
header_rdd = rdd.map(lambda l: h in l)
data_rdd = rdd.subtract(header_rdd)
data_rdd.first()

Finally, you may need to play around with your spark-submit command and add parameters for number of executors, executor memory, and driver memory

./spark-submit --master yarn --deploy-mode client --num-executors 100 --driver-memory 20G --executor-memory 10g <path/to/.py file>


回答2:

I got an executor lost error because I was using the sc.wholeTextFiles() call and one of my input files was large at 149M. It caused the executor to fail. I don't think that 149M is actually very large but it caused it to fail.