Partitioning a large skewed dataset in S3 with Spa

2019-02-28 05:11发布

问题:

I am trying to write out a large partitioned dataset to disk with Spark and the partitionBy algorithm is struggling with both of the approaches I've tried.

The partitions are heavily skewed - some of the partitions are massive and others are tiny.

Problem #1:

When I use repartition before repartitionBy, Spark writes all partitions as a single file, even the huge ones

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")

This takes forever to execute because Spark isn't writing the big partitions in parallel. If one of the partitions has 1TB of data, Spark will try to write the entire 1TB of data as a single file.

Problem #2:

When I don't use repartition, Spark writes out way too many files.

This code will write out an insane number of files.

df.write.partitionBy("some_col").parquet("partitioned_lake")

I ran this on a tiny 8 GB data subset and Spark wrote out 85,000+ files!

When I tried running this on a production data set, one partition that has 1.3 GB of data was written out as 3,100 files.

What I'd like

I'd like for each partition to get written out as 1 GB files. So a partition that has 7 GB of data will get written out as 7 files and a partition that has 0.3 GB of data will get written out as a single file.

What is my best path forward?

回答1:

The simplest solution is to add one or more columns to repartition and explicitly set the number of partitions.

val numPartitions = ???

df.repartition(numPartitions, $"some_col", $"some_other_col")
 .write.partitionBy("some_col")
 .parquet("partitioned_lake")

where:

  • numPartitions - should be an upper bound (actual number can be lower) of the desired number of files written to a partition directory.
  • $"some_other_col" (and optional additional columns) should have high cardinality and be independent of the $"some_column (there should be functional dependency between these two, and shouldn't be highly correlated).

    If data doesn't contain such column you can use o.a.s.sql.functions.rand.

    import org.apache.spark.sql.functions.rand
    
    df.repartition(numPartitions, $"some_col", rand)
      .write.partitionBy("some_col")
      .parquet("partitioned_lake")