I have a spark job where i am doing outer join between two data frames . Size of first data frame is 260 GB,file format is text files which is split into 2200 files and the size of second data frame is 2GB . Then writing data frame output which is about 260 GB into S3 takes very long time is more than 2 hours after that i cancelled because i have been changed heavily on EMR .
Here is my cluster info .
emr-5.9.0
Master: m3.2xlarge
Core: r4.16xlarge 10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)
This is my cluster config that i am setting
capacity-scheduler yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site fs.s3.maxConnections: 200
spark maximizeResourceAllocation: true
spark-defaults spark.dynamicAllocation.enabled: true
I tried setting memory component manually also like below and the performance was better but same thing it was taking again very long time
--num-executors 60--conf spark.yarn.executor.memoryOverhead=9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead=3072 --driver-memory 26G --executor-cores 10 --driver-cores 3 --conf spark.default.parallelism=1200
I am not using default partition to save data into S3 .
Adding all details about the jobs and query plan so that it will be easy to understand .
The real reason is partition .And that is taking most of the time. Because i have 2K files so if i use re partition like 200 the output files comes in lakhs and then loading again in spark is not a good story .
In below image i dont know why sort is again called after project
In below Image GC is too high for me ..Do oi have to handle this please suggest how?
Below is nodes health status .t this point data is getting saved into S3 no wonder why i can see only two nodes are active and all are idle .
This is the cluster details when it is loading ..At this point i can see cluster is fully utilized but while saving data into S3 many nodes are free .
Finally here is my code where i perform Join and then save into S3...
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)
val latestForEachKey = df2resultTimestamp.withColumn("rank", row_number.over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val columnMap = latestForEachKey.columns.filter(c => c.endsWith("_1") & c != "FFAction|!|_1").map(c => c -> c.dropRight(2)) :+ ("FFAction|!|_1", "FFAction|!|")
val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
val exprsExtended = Array(col("uniqueFundamentalSet"), col("PeriodId"), col("SourceId"), col("StatementTypeCode"), col("StatementCurrencyId"), col("FinancialStatementLineItem_lineItemId")) ++ exprs
//Joining both dara frame here
val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))
//Joing ends here
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))
val headerColumn = dataHeader.columns.toSeq
val headerFinal = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerFinal)
// dfMainOutputFinalWithoutNull.repartition($"DataPartition", $"PartitionYear", $"PartitionStatement")
.write
.partitionBy("DataPartition", "PartitionYear", "PartitionStatement")
.format("csv")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("nullValue", "")
.option("delimiter", "\t")
.option("quote", "\u0000")
.option("header", "true")
.option("codec", "bzip2")
.save(outputFileURL)
You are running five c3.4large EC2 instances, which has 30gb of RAM each. So thats only 150GB in total which is much smaller than your >200GB dataframe to be joined. Hence lots of disk spill. Maybe you can launch r type EC2 instances (memory optimized opposed to c type which is computation optimized) instead, and see if there is a performance improvement.
S3 is an object store and not a file system, hence the issues arising out of eventual consistency, non-atomic rename operations i.e., every time the executors writes the result of the job, each of them write to a temporary directory outside the main directory (on S3) where the files had to be written and once all the executors are done a rename is done to get atomic exclusivity. This is all fine in a standard filesystem like hdfs where renames are instantaneous but on an object store like S3, this is not conducive as renames on S3 are done at 6MB/s.
To overcome above problem, ensure setting the following two conf parameters
1) spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
For default value of this parameter i.e. 1, commitTask moves data generated by a task from the task temporary directory to job temporary directory and when all tasks complete, commitJob moves data to from job temporary directory the final destination. Because the driver is doing the work of commitJob, for S3, this operation can take a long time. A user may often think that his/her cell is “hanging”. However, when the value of mapreduce.fileoutputcommitter.algorithm.version is 2, commitTask will move data generated by a task directly to the final destination and commitJob is basically a no-op.
2) spark.speculation=false
In case this parameter is set to true then if one or more tasks are running slowly in a stage, they will be re-launched. As mentioned in above the write operation on S3 through spark job is very slow and hence we can see a lot of tasks getting re-launched as the output data size increases.
This along with eventual consistency (while moving files from temporary directory to main data directory) may cause FileOutputCommitter to go into dead lock and hence the job could fail.
Alternatively
You can write the output first to the local HDFS on EMR and then move the data to S3 using the hadoop distcp command. This improves the overall output speed drastically. However, you will need enough EBS storage on your EMR nodes to ensure all your output data fits in.
Further, you can write the output data in ORC format which will compress the output size considerably.
Reference :
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98