I want to know if Spark knows the partitioning key of the parquet file and uses this information to avoid shuffles.
Context:
Running Spark 2.0.1 running local SparkSession. I have a csv dataset that I am saving as parquet file on my disk like so:
val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
I am creating 42 partitions by column numerocarte
. This should group multiple numerocarte
to same partition. I don't want to do partitionBy("numerocarte") at the write
time because I don't want one partition per card. It would be millions of them.
After that in another script I read this SomeFile.parquet
parquet file and do some operations on it. In particular I am running a window function
on it where the partitioning is done on the same column that the parquet file was repartitioned by.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
After read
I can see that the repartition
worked as expected and DataFrame df2
has 42 partitions and in each of them are different cards.
Questions:
- Does Spark know that the dataframe
df2
is partitioned by columnnumerocarte
? - If it knows, then there will be no shuffle in the window function. True?
- If it does not know, It will do a shuffle in the window function. True?
- If it does not know, how do I tell Spark the data is already partitioned by the right column?
- How can I check a partitioning key of
DataFrame
? Is there a command for this? I know how to check number of partitions but how to see partitioning key? - When I print number of partitions in a file after each step, I have 42 partitions after
read
and 200 partitions afterwithColumn
which suggests that Spark repartitioned myDataFrame
. - If I have two different tables repartitioned with the same column, would the join use that information?
I am answering my own question for future reference what worked.
Following suggestion of @user8371915, bucketBy works!
I am saving my DataFrame
df
:Then when I need to load this table:
I confirm that when I do window functions on
df2
partitioned byuserid
there is no shuffle! Thanks @user8371915!Some things I learned while investigating it
spark.read.format("parquet").load("path/to/myNewTable")
but theDataFrame
created this way will not keep the original partitioning! You must usespark.sql
select
to get correctly partitionedDataFrame
.spark.sql("describe formatted myNewTable").collect.foreach(println)
. This will tell you what columns were used for bucketing and how many buckets there are..sortBy()
and the sort will be also preserved in the hive table.df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
myNewTable
is saved to aspark-warehouse
folder in my local Scala SBT project. When saving in cluster mode with mesos viaspark-submit
, it is saved to hive warehouse. For me it was located in/user/hive/warehouse
.spark-submit
you need to add to yourSparkSession
two options:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083")
and.enableHiveSupport()
. Otherwise the hive tables you created will not be visible.spark.sql("USE your database")
before bucketing.Update 05-02-2018
I encountered some problems with spark bucketing and creation of Hive tables. Please refer to question, replies and comments in Why is Spark saveAsTable with bucketBy creating thousands of files?
It does not.
You don't. Just because you save data which has been shuffled, it does not mean, that it will be loaded with the same splits.
There is no partitioning key once you loaded data, but you can check
queryExecution
forPartitioner
.In practice:
partitionBy
method ofDataFrameWriter
.bucketBy
with metastore and persistent tables.See How to define partitioning of DataFrame? for detailed examples.