How to delete a particular month from a parquet fi

2020-02-13 05:11发布

问题:

I am having monthly Revenue data for the last 5 years and I am storing the DataFrames for respective months in parquet formats in append mode, but partitioned by month column. Here is the pseudo-code below -

def Revenue(filename):
    df = spark.read.load(filename)
    .
    .
    df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')

Revenue('Revenue_201501.csv')
Revenue('Revenue_201502.csv')
Revenue('Revenue_201503.csv')
Revenue('Revenue_201504.csv')
Revenue('Revenue_201505.csv')

The df gets stored in parquet format on monthly basis, as can be seen below -

Question: How can I delete the parquet folder corresponding to a particular month?

One way would be to load all these parquet files in a big df and then use .where() clause to filter out that particular month and then save it back into parquet format partitionBy month in overwrite mode, like this -

# If we want to remove data from Feb, 2015
df = spark.read.format('parquet').load('Revenue.parquet')
df = df.where(col('month') != lit('2015-02-01'))
df.write.format('parquet').mode('overwrite').partitionBy('month').save('/path/Revenue')

But, this approach is quite cumbersome.

Other way is to directly delete the folder of that particular month, but I am not sure if that's a right way to approach things, lest we alter the metadata in an unforseeable way.

What would be the right way to delete the parquet data for a particular month?

回答1:

Spark supports deleting partition, both data and metadata.
Quoting the scala code comment

/**
 * Drop Partition in ALTER TABLE: to drop a particular partition for a table.
 *
 * This removes the data and metadata for this partition.
 * The data is actually moved to the .Trash/Current directory if Trash is configured,
 * unless 'purge' is true, but the metadata is completely lost.
 * An error message will be issued if the partition does not exist, unless 'ifExists' is true.
 * Note: purge is always false when the target is a view.
 *
 * The syntax of this command is:
 * {{{
 *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
 * }}}
 */

In your case, there is no backing table. We could register the dataframe as a temp table and use the above syntax(temp table documentation)

From pyspark, we could run the SQL using the syntax in this link Sample:

df = spark.read.format('parquet').load('Revenue.parquet'). registerTempTable("tmp")
spark.sql("ALTER TABLE tmp DROP IF EXISTS PARTITION (month='2015-02-01') PURGE")


回答2:

Below statement will only delete the metadata related to partition information.

ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");

you need to set the tblproperties for your hive external table as False, if you want to delete the data as well. It will set your hive table as managed table.

alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='FALSE');

you can set it back to external table.

alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='TRUE');

I tried setting given properties using spark session but was facing some issues.

 spark.sql("""alter table db.test_external set tblproperties ("EXTERNAL"="TRUE")""")
pyspark.sql.utils.AnalysisException: u"Cannot set or change the preserved property key: 'EXTERNAL';"

I am sure there must be someway to do this. I ended up using python. I defined below function in pyspark and it did the job.

query=""" hive -e 'alter table db.yourtable set tblproperties ("EXTERNAL"="FALSE");ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");' """

def delete_partition():
        print("I am here")
        import subprocess
        import sys
        p=subprocess.Popen(query,shell=True,stderr=subprocess.PIPE)
        stdout,stderr = p.communicate()
        if p.returncode != 0:
            print stderr
            sys.exit(1) 

>>> delete_partition()

This will delete the metadata and data both. Note. I have tested this with Hive ORC external partition table, which is partitioned on loaded_date

# Partition Information
# col_name              data_type               comment

loaded_date             string

Update: Basically your data is lying at hdfs location in subdirectory named as

/Revenue/month=2015-02-01
/Revenue/month=2015-03-01
/Revenue/month=2015-03-01

and so on

def delete_partition(month_delete):
      print("I am here")
      hdfs_path="/some_hdfs_location/Revenue/month="
      final_path=hdfs_path+month_delete
      import subprocess
      subprocess.call(["hadoop", "fs", "-rm", "-r", final_path])
      print("got deleted")

delete_partition("2015-02-01")