I am trying to leverage spark partitioning. I was trying to do something like
data.write.partitionBy("key").parquet("/location")
The issue here each partition creates huge number of parquet files which result slow read if I am trying to read from the root directory.
To avoid that I tried
data.coalese(numPart).write.partitionBy("key").parquet("/location")
This however creates numPart number of parquet files in each partition. Now my partition size is different. SO I would ideally like to have separate coalesce per partition. This is however doesn't look like an easy thing. I need to visit all the partition coalesce to a certain number and store at a separate location.
How should I use partitioning to avoid many files after write?
This is working for me very well:
It produces N files in each output partition (directory), and is (anecdotally) faster than using
coalesce
and (again, anecdotally, on my data set) faster than only repartitioning on the output.If you're working with S3, I also recommend doing everything on local drives (Spark does a lot of file creation/rename/deletion during write outs) and once it's all settled use hadoop
FileUtil
(or just the aws cli) to copy everything over:First I would really avoid using
coalesce
, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : How to prevent Spark optimization)Writing 1 file per parquet-partition is realtively easy (see Spark dataframe write method writing many small files):
If you want to set an arbitrary number of files (or files which have all the same size), you need to further repartition your data using another attribute which could be used (I cannot tell you what this might be in your case):
another_key
could be another attribute of your dataset, or a derived attribute using some modulo or rounding-operations on existing attributes. You could even use window-functions withrow_number
overkey
and then round this by something likeThis would put you
N
records into 1 parquet fileusing orderBy
You can also control the number of files without repartitioning by ordering your dataframe accordingly:
This will lead to a total of
spark.sql.shuffle.partitions
across all partitions (by default 200). It's even beneficial to add a second ordering column after$key
, as parquet will remember the ordering of the dataframe and will write the statistics accordingly. For example, you can order by an ID:This will not change the number of files, but it will improve the performance when you query your parquet file for a given
key
andid
. See e.g. https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide and https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example