Spark 1.6 on EMR writing to S3 as Parquet hangs an

2019-03-21 06:33发布

I'm creating an uber jar spark application that I'm spark submitting to an EMR 4.3 cluster, I'm provisioning 4 r3.xlarge instances, one to be the master and the other three as the cores.

I have hadoop 2.7.1, ganglia 3.7.2 spark 1.6, and hive 1.0.0 pre-installed from the console.

I'm running the following command:

spark-submit \
--deploy-mode cluster \
--executor-memory 4g \
--executor-cores 2 \
--num-executors 4 
--driver-memory 4g 
--driver-cores 2  
--conf "spark.driver.maxResultSize=2g" 
--conf "spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter"  
--conf "spark.shuffle.memoryFraction=0.2" 
--class com.jackar.spark.Main spark-app.jar args

I realise that I'm not fully utilising the cluster, but at this point I'm not exactly trying to tune (or maybe that's what I should be doing?). My underlying job does something along the lines of:

1) Read parquet files from s3 that represent two datasets, run registerTempTable on the Dataframes, then run cacheTable on each. They are each about 300 mb in memory. (note: I've tried this using EMRs s3:// protocol as well as s3a://)

2) Use spark sql to run aggregations (i.e. sums and group bys).

3) Write the results to s3 as parquet files.

The jobs are running just fine when I look at the Spark UI, and they take about as long as I expect them to. The problem is that after the write-agg-to-parquet-in-s3 job finishes (Job tab), there is a period of time where no other jobs get queued up.

If I then go to the SQL tab in the Spark UI, I'll notice there is a "running query" for the same job that the Jobs tab says has completed. When I click in and look at the DAG for that query, I notice that the DAG seems to already be evaluated.

DAG visualisation

However, this query takes minutes and sometimes causes the entire spark application to restart and eventually fail...

Spark UI: SQL tab

I began to do some investigating to see if I could figure out the issue because on my Databricks trial this job is incredibly quick to execute, the DAG being identical to that on EMR (as expected). But I can't bring myself to justify using Databricks when I have no idea why I'm not seeing similar performance in EMR.

Maybe it's my JVM params? Garbage collection for instance? Time to check the executor logs.

2016-02-23T18:25:48.598+0000: [GC2016-02-23T18:25:48.598+0000: [ParNew: 299156K->30449K(306688K), 0.0255600 secs] 1586767K->1329022K(4160256K), 0.0256610 secs] [Times: user=0.05 sys=0.00, real=0.03 secs] 
2016-02-23T18:25:50.422+0000: [GC2016-02-23T18:25:50.422+0000: [ParNew: 303089K->32739K(306688K), 0.0263780 secs] 1601662K->1342494K(4160256K), 0.0264830 secs] [Times: user=0.07 sys=0.01, real=0.02 secs] 
2016-02-23T18:25:52.223+0000: [GC2016-02-23T18:25:52.223+0000: [ParNew: 305379K->29373K(306688K), 0.0297360 secs] 1615134K->1348874K(4160256K), 0.0298410 secs] [Times: user=0.08 sys=0.00, real=0.03 secs] 
2016-02-23T18:25:54.247+0000: [GC2016-02-23T18:25:54.247+0000: [ParNew: 302013K->28521K(306688K), 0.0220650 secs] 1621514K->1358123K(4160256K), 0.0221690 secs] [Times: user=0.06 sys=0.01, real=0.02 secs] 
2016-02-23T18:25:57.994+0000: [GC2016-02-23T18:25:57.994+0000: [ParNew: 301161K->23609K(306688K), 0.0278800 secs] 1630763K->1364319K(4160256K), 0.0279460 secs] [Times: user=0.07 sys=0.01, real=0.03 secs]

Okay. That doesn't look good. Parnew stops the world and it happens every couple of seconds.

Next step, looking into the Spark UI on Databricks to see if the gc configuration is any different than EMRs. I found something kind of interesting. Databricks sets the spark.executor.extraJavaOptions to:

-XX:ReservedCodeCacheSize=256m 
-XX:+UseCodeCacheFlushing 
-javaagent:/databricks/DatabricksAgent.jar 
-XX:+PrintFlagsFinal 
-XX:+PrintGCDateStamps 
-verbose:gc 
-XX:+PrintGCDetails 
-XX:+HeapDumpOnOutOfMemoryError 
-Ddatabricks.serviceName=spark-executor-1

Hey, I'm no gc expert and I added "learn to tune gc" to my todo list, but what I see here is more than just gc params for the executors. What does the DatabricksAgent.jar do - does that help? I'm not sure, so I force my spark job to use the java options for the executors minus the databricks specific stuff:

--conf spark.executor.extraJavaOptions="-XX:ReservedCodeCacheSize=256m -XX:+UseCodeCacheFlushing -XX:+PrintFlagsFinal -XX:+PrintGCDateStamps -verbose:gc -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError"

This doesn't change the "running query" behaviour - it still takes forever - but I do get PSYoungGen instead of Parnew (frequency is still every couple of seconds though):

2016-02-23T19:40:58.645+0000: [GC [PSYoungGen: 515040K->12789K(996352K)] 1695803K->1193777K(3792896K), 0.0203380 secs] [Times: user=0.03 sys=0.01, real=0.02 secs] 
2016-02-23T19:57:50.463+0000: [GC [PSYoungGen: 588789K->13391K(977920K)] 1769777K->1196033K(3774464K), 0.0237240 secs] [Times: user=0.04 sys=0.00, real=0.02 secs] 

If you've read up to here, I commend you. I know how long of a post this is.

Another symptom I found is that while the query is running stderr and stdout are in a standstill and no new log lines are added on any executor (including the driver).

i.e.

16/02/23 19:41:23 INFO ContextCleaner: Cleaned shuffle 5
16/02/23 19:57:32 INFO DynamicPartitionWriterContainer: Job job_201602231940_0000 committed.

That same ~17 minute gap is accounted for in the Spark UI as the running query... Any idea what's going on?

In the end this job tends to restart after a few aggs get written to S3 (say 10% of them), and then eventually the spark application fails.

I'm not sure if the issue is related to the fact that EMR runs on YARN while Databricks runs on a Standalone cluster or if it's completely unrelated.

The failure I end up getting after I look into the yarn logs is the following:

java.io.FileNotFoundException: No such file or directory: s3a://bucket_file_stuff/_temporary/0/task_201602232109_0020_m_000000/

Any advice is much appreciated. I'll add notes as I go. Thanks!

0条回答
登录 后发表回答