I have a Spark SQL that used to execute < 10 mins now running at 3 hours after a cluster migration and need to deep dive on what it's actually doing. I'm new to spark and please don't mind if I'm asking something unrelated.
Increased spark.executor.memory
but no luck.
Env: Azure HDInsight Spark 2.4 on Azure Storage
SQL: Read and Join some data and finally write result to a Hive metastore.
The spark.sql
script ends with below code:
.write.mode("overwrite").saveAsTable("default.mikemiketable")
Application Behavior:
Within the first 15 mins, it loads and complete most tasks (199/200); left only 1 executor process alive and continually to shuffle read / write data. Because now it only leave 1 executor, we need to wait 3 hours until this application finish.
Left only 1 executor alive
Not sure what's the executor doing:
From time to time, we can tell the shuffle read increased:
Therefore I increased the spark.executor.memory to 20g, but nothing changed. From Ambari and YARN I can tell the cluster has many resources left.
Release of almost all executor
Any guidance is greatly appreciated.
I would like to start with some observations for your case:
- From the tasks list you can see that that Shuffle Spill (Disk) and Shuffle Spill (Memory) have both very high values. The max block size for each partition during the exchange of data should not exceed 2GB therefore you should be aware to keep the size of shuffled data as low as possible. As rule of thumb you need to remember that the size of each partition should be ~200-500MB. For instance if the total data is 100GB you need at least 250-500 partitions to keep the partition size within the mentioned limits.
- The co-existence of two previous it also means that the executor memory was not sufficient and Spark was forced to spill data to the disk.
- The duration of the tasks is too high. A normal task should lasts between 50-200ms.
- Too many killed executors is another sign which shows that you are facing OOM problems.
- Locality is RACK_LOCAL which is considered one of the lowest values you can achieve within a cluster. Briefly, that means that the task is being executed in a different node than the data is stored.
As solution I would try the next few things:
- Increase the number of partitions by using
repartition()
or via Spark settings with spark.sql.shuffle.partitions
to a number that meets the requirements above i.e 1000 or more.
- Change the way you store the data and introduce partitioned data i.e day/month/year using
partitionBy