I've started using Spark SQL and DataFrames in Spark 1.4.0. I'm wanting to define a custom partitioner on DataFrames, in Scala, but not seeing how to do this.
One of the data tables I'm working with contains a list of transactions, by account, silimar to the following example.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
At least initially, most of the calculations will occur between the transactions within an account. So I would want to have the data partitioned so that all of the transactions for an account are in the same Spark partition.
But I'm not seeing a way to define this. The DataFrame class has a method called 'repartition(Int)', where you can specify the number of partitions to create. But I'm not seeing any method available to define a custom partitioner for a DataFrame, such as can be specified for an RDD.
The source data is stored in Parquet. I did see that when writing a DataFrame to Parquet, you can specify a column to partition by, so presumably I could tell Parquet to partition it's data by the 'Account' column. But there could be millions of accounts, and if I'm understanding Parquet correctly, it would create a distinct directory for each Account, so that didn't sound like a reasonable solution.
Is there a way to get Spark to partition this DataFrame so that all data for an Account is in the same partition?
Use the DataFrame returned by:
There is no explicit way to use partitionBy on a DataFrame, only on a PairRDD, but when you sort a DataFrame, it will use that in it's LogicalPlan and that will help when you need to make calculations on each Account.
I just stumbled upon the same exact issue, with a dataframe that I want to partition by account. I assume that when you say "want to have the data partitioned so that all of the transactions for an account are in the same Spark partition", you want it for scale and performance, but your code doesn't depend on it (like using mapPartitions() etc), right?
So to start with some kind of answer : ) - You can't
I am not an expert, but as far as I understand DataFrames, they are not equal to rdd and DataFrame has no such thing as Partitioner.
Generally DataFrame's idea is to provide another level of abstraction that handles such problems itself. The queries on DataFrame are translated into logical plan that is further translated to operations on RDDs. The partitioning you suggested will probably be applied automatically or at least should be.
If you don't trust SparkSQL that it will provide some kind of optimal job, you can always transform DataFrame to RDD[Row] as suggested in of the comments.
In Spark < 1.6 If you create a
HiveContext
, not the plain oldSqlContext
you can use the HiveQLDISTRIBUTE BY colX...
(ensures each of N reducers gets non-overlapping ranges of x) &CLUSTER BY colX...
(shortcut for Distribute By and Sort By) for example;Not sure how this fits in with Spark DF api. These keywords aren't supported in the normal SqlContext (note you dont need to have a hive meta store to use the HiveContext)
EDIT: Spark 1.6+ now has this in the native DataFrame API
Spark >= 2.3.0
SPARK-22614 exposes range partitioning.
SPARK-22389 exposes external format partitioning in the Data Source API v2.
Spark >= 1.6.0
In Spark >= 1.6 it is possible to use partitioning by column for query and caching. See: SPARK-11410 and SPARK-4849 using
repartition
method:Unlike
RDDs
SparkDataset
(includingDataset[Row]
a.k.aDataFrame
) cannot use custom partitioner as for now. You can typically address that by creating an artificial partitioning column but it won't give you the same flexibility.Spark < 1.6.0:
One thing you can do is to pre-partition input data before you create a
DataFrame
Since
DataFrame
creation from anRDD
requires only a simple map phase existing partition layout should be preserved*:The same way you can repartition existing
DataFrame
:So it looks like it is not impossible. The question remains if it make sense at all. I will argue that most of the time it doesn't:
Repartitioning is an expensive process. In a typical scenario most of the data has to be serialized, shuffled and deserialized. From the other hand number of operations which can benefit from a pre-partitioned data is relatively small and is further limited if internal API is not designed to leverage this property.
GROUP BY
- it is possible to reduce memory footprint of the temporary buffers**, but overall cost is much higher. More or less equivalent togroupByKey.mapValues(_.reduce)
(current behavior) vsreduceByKey
(pre-partitioning). Unlikely to be useful in practice.SqlContext.cacheTable
. Since it looks like it is using run length encoding, applyingOrderedRDDFunctions.repartitionAndSortWithinPartitions
could improve compression ratio.Performance is highly dependent on a distribution of the keys. If it is skewed it will result in a suboptimal resource utilization. In the worst case scenario it will be impossible to finish the job at all.
Related concepts
Partitioning with JDBC sources:
JDBC data sources support
predicates
argument. It can be used as follows:It creates a single JDBC partition per predicate. Keep in mind that if sets created using individual predicates are not disjoint you'll see duplicates in the resulting table.
partitionBy
method inDataFrameWriter
:Spark
DataFrameWriter
providespartitionBy
method which can be used to "partition" data on write. It separates data on write using provided set of columnsThis enables predicate push down on read for queries based on key:
but it is not equivalent to
DataFrame.repartition
. In particular aggregations like:will still require
TungstenExchange
:bucketBy
method inDataFrameWriter
(Spark >= 2.0):bucketBy
has similar applications aspartitionBy
but it is available only for tables (saveAsTable
). Bucketing information can used to optimize joins:* By partition layout I mean only a data distribution.
partitioned
RDD has no longer a partitioner. ** Assuming no early projection. If aggregation covers only small subset of columns there is probably no gain whatsoever.I was able to do this using RDD. But I don't know if this is an acceptable solution for you. Once you have the DF available as an RDD, you can apply
repartitionAndSortWithinPartitions
to perform custom repartitioning of data.Here is a sample I used: