My question is triggered by the use case of calculating the differences between consecutive rows in a spark dataframe.
For example, I have:
>>> df.show()
+-----+----------+
|index| col1|
+-----+----------+
| 0.0|0.58734024|
| 1.0|0.67304325|
| 2.0|0.85154736|
| 3.0| 0.5449719|
+-----+----------+
If I choose to calculate these using "Window" functions, then I can do that like so:
>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index| col1| diffs_col1|
+-----+----------+-----------+
| 0.0|0.58734024|0.085703015|
| 1.0|0.67304325| 0.17850411|
| 2.0|0.85154736|-0.30657548|
| 3.0| 0.5449719| null|
+-----+----------+-----------+
Question: I explicitly partitioned the dataframe in a single partition. What is the performance impact of this and, if there is, why is that so and how could I avoid it? Because when I do not specify a partition, I get the following warning:
16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
In practice performance impact will be almost the same as if you omitted
partitionBy
clause at all. All records will be shuffled to a single partition, sorted locally and iterated sequentially one by one.The difference is only in the number of partitions created in total. Let's illustrate that with an example using simple dataset with 10 partitions and 1000 records:
If you define frame without partition by clause
and use it with
lag
there will be only one partition in total:
Compared to that frame definition with dummy index (simplified a bit compared to your code:
will use number of partitions equal to
spark.sql.shuffle.partitions
:with only one non-empty partition:
Unfortunately there is no universal solution which can be used to address this problem in PySpark. This just an inherent mechanism of the implementation combined with distributed processing model.
Since
index
column is sequential you could generate artificial partitioning key with fixed number of records per block:and use it to define frame specification:
This will use expected number of partitions:
with roughly uniform data distribution (we cannot avoid hash collisions):
but with a number of gaps on the block boundaries:
Since boundaries are easy to compute:
you can always select:
and fill these separately:
and
join
:to get desired result: