Extremely slow S3 write times from EMR/ Spark

2019-02-01 04:16发布

I'm writing to see if anyone knows how to speed up S3 write times from Spark running in EMR?

My Spark Job takes over 4 hours to complete, however the cluster is only under load during the first 1.5 hours.

enter image description here

I was curious into what Spark was doing all this time. I looked at the logs and I found many s3 mv commands, one for each file. Then taking a look directly at S3 I see all my files are in a _temporary directory.

Secondary, I'm concerned with my cluster cost, it appears I need to buy 2 hours of compute for this specific task. However, I end up buying unto 5 hours. I'm curious if EMR AutoScaling can help with cost in this situation.

Some articles discuss changing the file output committer algorithm but I've had little success with that.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

Writing to the local HDFS is quick. I'm curious if issuing a hadoop command to copy the data to S3 would be faster?

enter image description here

6条回答
放我归山
2楼-- · 2019-02-01 05:00

What you are seeing is a problem with outputcommitter and s3. the commit job applies fs.rename on the _temporary folder and since S3 does not support rename it means that a single request is now copying and deleting all the files from _temporary to its final destination..

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") only works with hadoop version > 2.7. what it does is to copy each file from _temporary on commit task and not commit job so it is distributed and works pretty fast.

If you use older version of hadoop I would use Spark 1.6 and use:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

*note that it does not work with specualtion turned on or writing in append mode

**also note that it is deprecated in Spark 2.0 (replaced by algorithm.version=2)

BTW in my team we actually write with Spark to HDFS and use DISTCP jobs (specifically s3-dist-cp) in production to copy the files to S3 but this is done for several other reasons (consistency, fault tolerance) so it is not necessary.. you can write to S3 pretty fast using what I suggested.

查看更多
唯我独甜
3楼-- · 2019-02-01 05:12

The direct committer was pulled from spark as it wasn't resilient to failures. I would strongly advice against using it.

There is work ongoing in Hadoop, s3guard, to add 0-rename committers, which will be O(1) and fault tolerant; keep an eye on HADOOP-13786.

Ignoring "the Magic committer" for now, the Netflix-based staging committer will ship first (hadoop 2.9? 3.0?)

  1. This writes the work to the local FS, in task commit
  2. issues uncommitted multipart put operations to write the data, but not materialize it.
  3. saves the information needed to commit the PUT to HDFS, using the original "algorithm 1" file output committer
  4. Implements a job commit which uses the file output commit of HDFS to decide which PUTs to complete, and which to cancel.

Result: task commit takes data/bandwith seconds, but job commit takes no longer than the time to do 1-4 GETs on the destination folder and a POST for every pending file, the latter being parallelized.

You can pick up the committer which this work is based on, from netflix, and probably use it in spark today. Do set the file commit algorithm = 1 (should be the default) or it wont actually write the data.

查看更多
做个烂人
4楼-- · 2019-02-01 05:18

How large is the file(s) you are writing too? Having one core writing to a very large file is going to be much slower than splitting the file up and have multiple workers write out smaller files.

查看更多
干净又极端
5楼-- · 2019-02-01 05:19

I had similar use case where I used spark to write to s3 and had performance issue. Primary reason was spark was creating lot of zero byte part files and replacing temp files to actual file name was slowing down the write process. Tried below approach as work around

  1. Write output of spark to HDFS and used Hive to write to s3. Performance was much better as hive was creating less number of part files. Problem I had is(also had same issue when using spark), delete action on Policy was not provided in prod env because of security reasons. S3 bucket was kms encrypted in my case.

  2. Write spark output to HDFS and Copied hdfs files to local and used aws s3 copy to push data to s3. Had second best results with this approach. Created ticket with Amazon and they suggested to go with this one.

  3. Use s3 dist cp to copy files from HDFS to S3. This was working with no issues, but not performant

查看更多
劳资没心,怎么记你
6楼-- · 2019-02-01 05:19

We experienced the same on Azure using Spark on WASB. We finally decided to not use the distrbitued storage directly with spark. We did spark.write to a real hdfs:// destination and develop a specific tool that do : hadoop copyFromLocal hdfs:// wasb:// The HDFS is then our temporary buffer before archiving to WASB (or S3).

查看更多
聊天终结者
7楼-- · 2019-02-01 05:20

What do you see in spark output? If you see lots of rename operations, read this

查看更多
登录 后发表回答