Why Spark DataFrame is creating wrong number of pa

2020-07-22 09:11发布

问题:

I have a spark dataframe with 2 columns - col1 and col2.

scala> val df = List((1, "a")).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]

When I write df on disk in parquet format, to write all the data in number of files equal to the number of unique values in col1 I do a repartition using col1, like this:

scala> df.repartition(col("col1")).write.partitionBy("col1").parquet("file")

Above code produces only one file in filesystem. But, the number of shuffle operations becomes 200.

I am not able to understand one thing here that if col1 contains only one value, i.e., 1 then why it is creating 200 partitions in repartition ?

回答1:

repartition(columnName) per default creates 200 partitions (more specific, spark.sql.shuffle.partitions partitions), no matter how many unique values of col1 there are. If there is only 1 unique value of col1, then 199 of the partitions are empty. On the other hand, if you have more than 200 unique values of col1, you will will have multiple values of col1 per partition.

If you only want 1 partition, then you can do repartition(1,col("col1")) or just coalesce(1). But not that coalesce does not behave the same in the sense that coalesce me be moved further up in your code suscht that you may loose parallelism (see How to prevent Spark optimization)

If you want to check the content of your partition, I've made 2 methods for this:

// calculates record count per partition
def inspectPartitions(df: DataFrame) = {
    import df.sqlContext.implicits._
    df.rdd.mapPartitions(partIt => {
       Iterator(partIt.toSeq.size)
    }
    ).toDF("record_count")
}

// inspects how a given key is distributed accross the partition of a dataframe
def inspectPartitions(df: DataFrame, key: String) = {
    import df.sqlContext.implicits._
    df.rdd.mapPartitions(partIt => {
      val part = partIt.toSet
      val partSize = part.size
        val partKeys = part.map(r => r.getAs[Any](key).toString.trim)
        val partKeyStr = partKeys.mkString(", ")
        val partKeyCount = partKeys.size
       Iterator((partKeys.toArray,partSize))
    }
    ).toDF("partitions","record_count")
}

Now you can e.g. check your dataframe like this:

inspectPartitions(df.repartition(col("col1"),"col1")
.where($"record_count">0)
.show


回答2:

In the Spark SQL shuffle world, the default number of shuffle partition is 200, which is controlled by spark.sql.shuffle.partitions