I am having monthly
Revenue data for the last 5 years and I am storing the DataFrames for respective months in parquet
formats in append
mode, but partitioned by
month
column. Here is the pseudo-code below -
def Revenue(filename):
df = spark.read.load(filename)
.
.
df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')
Revenue('Revenue_201501.csv')
Revenue('Revenue_201502.csv')
Revenue('Revenue_201503.csv')
Revenue('Revenue_201504.csv')
Revenue('Revenue_201505.csv')
The df
gets stored in parquet
format on monthly basis, as can be seen below -
Question: How can I delete the parquet
folder corresponding to a particular month?
One way would be to load all these parquet
files in a big df
and then use .where()
clause to filter out that particular month and then save it back into parquet
format partitionBy
month in overwrite
mode, like this -
# If we want to remove data from Feb, 2015
df = spark.read.format('parquet').load('Revenue.parquet')
df = df.where(col('month') != lit('2015-02-01'))
df.write.format('parquet').mode('overwrite').partitionBy('month').save('/path/Revenue')
But, this approach is quite cumbersome.
Other way is to directly delete the folder of that particular month, but I am not sure if that's a right way to approach things, lest we alter the metadata
in an unforseeable way.
What would be the right way to delete the parquet
data for a particular month?
Spark supports deleting partition, both data and metadata.
Quoting the scala code comment
/**
* Drop Partition in ALTER TABLE: to drop a particular partition for a table.
*
* This removes the data and metadata for this partition.
* The data is actually moved to the .Trash/Current directory if Trash is configured,
* unless 'purge' is true, but the metadata is completely lost.
* An error message will be issued if the partition does not exist, unless 'ifExists' is true.
* Note: purge is always false when the target is a view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
* }}}
*/
In your case, there is no backing table.
We could register the dataframe as a temp table and use the above syntax(temp table documentation)
From pyspark, we could run the SQL using the syntax in this link
Sample:
df = spark.read.format('parquet').load('Revenue.parquet'). registerTempTable("tmp")
spark.sql("ALTER TABLE tmp DROP IF EXISTS PARTITION (month='2015-02-01') PURGE")
Below statement will only delete the metadata related to partition information.
ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");
you need to set the tblproperties for your hive external table as False, if you want to delete the data as well. It will set your hive table as managed table.
alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='FALSE');
you can set it back to external table.
alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='TRUE');
I tried setting given properties using spark session but was facing some issues.
spark.sql("""alter table db.test_external set tblproperties ("EXTERNAL"="TRUE")""")
pyspark.sql.utils.AnalysisException: u"Cannot set or change the preserved property key: 'EXTERNAL';"
I am sure there must be someway to do this. I ended up using python. I defined below function in pyspark and it did the job.
query=""" hive -e 'alter table db.yourtable set tblproperties ("EXTERNAL"="FALSE");ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");' """
def delete_partition():
print("I am here")
import subprocess
import sys
p=subprocess.Popen(query,shell=True,stderr=subprocess.PIPE)
stdout,stderr = p.communicate()
if p.returncode != 0:
print stderr
sys.exit(1)
>>> delete_partition()
This will delete the metadata and data both.
Note. I have tested this with Hive ORC external partition table, which is partitioned on loaded_date
# Partition Information
# col_name data_type comment
loaded_date string
Update:
Basically your data is lying at hdfs location in subdirectory named as
/Revenue/month=2015-02-01
/Revenue/month=2015-03-01
/Revenue/month=2015-03-01
and so on
def delete_partition(month_delete):
print("I am here")
hdfs_path="/some_hdfs_location/Revenue/month="
final_path=hdfs_path+month_delete
import subprocess
subprocess.call(["hadoop", "fs", "-rm", "-r", final_path])
print("got deleted")
delete_partition("2015-02-01")