What should be the optimal value for spark.sql.shu

2019-01-13 10:29发布

问题:

Hi I am using Spark SQL actually hiveContext.sql() which uses group by queries and I am running into OOM issues. So thinking of increasing value of spark.sql.shuffle.partitions from 200 default to 1000 but it is not helping. Please correct me if I am wrong this partitions will share data shuffle load so more the partitions less data to hold. Please guide I am new to Spark. I am using Spark 1.4.0 and I have around 1TB of uncompressed data to process using hiveContext.sql() group by queries.

回答1:

If you're running out of memory on the shuffle, try setting spark.sql.shuffle.partitions to 2001.

Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000:

private[spark] object MapStatus {

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }
...

I really wish they would let you configure this independently.

By the way, I found this information in a Cloudera slide deck.



回答2:

OK so I think your issue is more general. It's not specific to Spark SQL, it's a general problem with Spark where it ignores the number of partitions you tell it when the files are few. Spark seems to have the same number of partitions as the number of files on HDFS, unless you call repartition. So calling repartition ought to work, but has the caveat of causing a shuffle somewhat unnecessarily.

I raised this question a while ago and have still yet to get a good answer :(

Spark: increase number of partitions without causing a shuffle?



回答3:

It's actually depends on your data and your query, if Spark must load 1Tb, there is something wrong on your design.

Use the superbe web UI to see the DAG, mean how Spark is translating your SQL query to jobs/stages and tasks.

Useful metrics are "Input" and "Shuffle".

  • Partition your data (Hive / directory layout like /year=X/month=X)
  • Use spark CLUSTER BY feature, to work per data partition
  • Use ORC / Parquet file format because they provide "Push-down filter", useless data is not loaded to Spark
  • Analyze Spark History to see how Spark is reading data

Also, OOM could happen on your driver?

-> this is another issue, the driver will collect at the end the data you want. If you ask too much data, the driver will OOM, try limiting your query, or write another table (Spark syntax CREATE TABLE ...AS).



回答4:

I came across this post from Cloudera about Hive Partitioning. Check out the "Pointers" section talking about number of partitions and number of files in each partition resulting in overloading the name node, which might cause OOM.