Spark SQL slow execution with resource idle

2019-05-28 07:52发布

问题:

I have a Spark SQL that used to execute < 10 mins now running at 3 hours after a cluster migration and need to deep dive on what it's actually doing. I'm new to spark and please don't mind if I'm asking something unrelated.

Increased spark.executor.memory but no luck.

Env: Azure HDInsight Spark 2.4 on Azure Storage

SQL: Read and Join some data and finally write result to a Hive metastore.

The spark.sql script ends with below code: .write.mode("overwrite").saveAsTable("default.mikemiketable")

Application Behavior: Within the first 15 mins, it loads and complete most tasks (199/200); left only 1 executor process alive and continually to shuffle read / write data. Because now it only leave 1 executor, we need to wait 3 hours until this application finish.

Left only 1 executor alive

Not sure what's the executor doing:

From time to time, we can tell the shuffle read increased:

Therefore I increased the spark.executor.memory to 20g, but nothing changed. From Ambari and YARN I can tell the cluster has many resources left.

Release of almost all executor

Any guidance is greatly appreciated.

回答1:

I would like to start with some observations for your case:

  1. From the tasks list you can see that that Shuffle Spill (Disk) and Shuffle Spill (Memory) have both very high values. The max block size for each partition during the exchange of data should not exceed 2GB therefore you should be aware to keep the size of shuffled data as low as possible. As rule of thumb you need to remember that the size of each partition should be ~200-500MB. For instance if the total data is 100GB you need at least 250-500 partitions to keep the partition size within the mentioned limits.
  2. The co-existence of two previous it also means that the executor memory was not sufficient and Spark was forced to spill data to the disk.
  3. The duration of the tasks is too high. A normal task should lasts between 50-200ms.
  4. Too many killed executors is another sign which shows that you are facing OOM problems.
  5. Locality is RACK_LOCAL which is considered one of the lowest values you can achieve within a cluster. Briefly, that means that the task is being executed in a different node than the data is stored.

As solution I would try the next few things:

  • Increase the number of partitions by using repartition() or via Spark settings with spark.sql.shuffle.partitions to a number that meets the requirements above i.e 1000 or more.
  • Change the way you store the data and introduce partitioned data i.e day/month/year using partitionBy