How to use DataFrame withColumn and not to change

2019-03-01 07:50发布

问题:

For some reason I have to convert RDD to DataFrame, then do something with DataFrame.

My interface is RDD,so I have to convert DataFrame to RDD, and when I use df.withcolumn, the partition change to 1, so I have to repartition and sortBy RDD.

Is there any cleaner solution ?

This is my code :

val rdd = sc.parallelize(List(1,3,2,4,5,6,7,8),4)
val partition = rdd.getNumPartitions
println(partition + "rdd")

val df=rdd.toDF()
val rdd2=df.rdd

val result = rdd.toDF("col1")
  .withColumn("csum", sum($"col1").over(Window.orderBy($"col1")))
  .withColumn("rownum", row_number().over(Window.orderBy($"col1")))
  .withColumn("avg", $"csum"/$"rownum").rdd

println(result.getNumPartitions + "rdd2")

回答1:

Let's make this as simple as possible, we will generate the same data into 4 partitions

scala> val df = spark.range(1,9,1,4).toDF
df: org.apache.spark.sql.DataFrame = [id: bigint]

scala> df.show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
+---+

scala> df.rdd.getNumPartitions
res13: Int = 4

We don't need 3 window functions to prove this, so let's do it with one :

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val df2 = df.withColumn("csum", sum($"id").over(Window.orderBy($"id")))
df2: org.apache.spark.sql.DataFrame = [id: bigint, csum: bigint]

So what's happening here is that we didn't just add a column but we computed a window of cumulative sum over the data and since you haven't provided an partition column, the window function will move all the data to a single partition and you even get a warning from spark :

scala> df2.rdd.getNumPartitions
17/06/06 10:05:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
res14: Int = 1

scala> df2.show
17/06/06 10:05:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+----+
| id|csum|
+---+----+
|  1|   1|
|  2|   3|
|  3|   6|
|  4|  10|
|  5|  15|
|  6|  21|
|  7|  28|
|  8|  36|
+---+----+

So let's add now a column to partition on. We will create a new DataFrame just for the sake of demonstration :

scala> val df3 = df.withColumn("x", when($"id"<5,lit("a")).otherwise("b"))
df3: org.apache.spark.sql.DataFrame = [id: bigint, x: string]

It has indeed the same number of partitions that we defined explicitly on df :

scala> df3.rdd.getNumPartitions
res18: Int = 4

Let's perform our window operation using the column x to partition :

scala> val df4 = df3.withColumn("csum", sum($"id").over(Window.orderBy($"id").partitionBy($"x")))
df4: org.apache.spark.sql.DataFrame = [id: bigint, x: string ... 1 more field]

scala> df4.show
+---+---+----+                                                                  
| id|  x|csum|
+---+---+----+
|  5|  b|   5|
|  6|  b|  11|
|  7|  b|  18|
|  8|  b|  26|
|  1|  a|   1|
|  2|  a|   3|
|  3|  a|   6|
|  4|  a|  10|
+---+---+----+

The window function will repartition our data using the default number of partitions set in spark configuration.

scala> df4.rdd.getNumPartitions
res20: Int = 200


回答2:

I was just reading about controlling the number of partitions when using groupBy aggregation, from https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-performance-tuning-groupBy-aggregation.html, it seems the same trick works with Window, in my code I'm defining a window like

windowSpec = Window \
    .partitionBy('colA', 'colB') \
    .orderBy('timeCol') \
    .rowsBetween(1, 1)

and then doing

next_event = F.lead('timeCol', 1).over(windowSpec)

and creating a dataframe via

df2 = df.withColumn('next_event', next_event)

and indeed, it has 200 partitions. But, if I do

df2 = df.repartition(10, 'colA', 'colB').withColumn('next_event', next_event)

it has 10!