Is there a way to control number of part files in

2020-08-01 02:19发布

问题:

When i save the DataFrame resulting from sparksql query in HDFS, it generates large number of part files with each one at 1.4 KB. is there a way to increase size of file as every part file contains about 2 records.

df_crimes_dates_formated = spark.sql('SELECT CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) AS DATES , Primary_Type , COUNT(1) AS COUNT  FROM crimes_data Group By CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) , Primary_Type ORDER BY CONCAT( SUBSTR(Dates,1,2), SUBSTR(Dates,7,4)) , COUNT(1) DESC' )

df_crimes_dates_formated.write.save('hdfs:///user/maria_dev/crimes/monthly_crimes/') 

回答1:

You can use either .repartition() (or) .coalesce() depending on your usecase, to control number of files in HDFS.

#to get number of partitions of dataframe, spark creates part files depends on number of partitions in dataframe
>>> df_crimes_dates_formated.rdd.getNumPartitions()

#create 10 part files in HDFS
>>> df_crimes_dates_formated.repartition(10).write.save('hdfs:///user/maria_dev/crimes/monthly_crimes/') 

Caluculating number of partitons dynamically:

You can come up with number of rows that each partition will have, So that
will give desired file size then divide that with dataframe count to dynamically decide number of partitions.

df.count()
#3

#req rows for each partition
rows=1
par=df.count()/rows
partitions=int('1' if par <= 0 else par)

#repartition with partitions value
df.repartition(partitions).rdd.getNumPartitions()
#3

In addition:

From Spark-2.2 if we have 1 partition in dataframe and control number of rows getting written to the file use maxRecordsPerFile option.

#assuming df_crimes_dates_formated having 1 partition then spark creates each file with 100 records in it.
df_crimes_dates_formated.write.option("maxRecordsPerFile", 100).save("hdfs:///user/maria_dev/crimes/monthly_crimes/")