Overwrite specific partitions in spark dataframe w

2019-01-04 23:49发布

I want to overwrite specific partitions instead of all in spark. I am trying the following command:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')

where df is dataframe having the incremental data to be overwritten.

hdfs-base-path contains the master data.

When I try the above command, it deletes all the partitions, and inserts those present in df at the hdfs path.

What my requirement is to overwrite only those partitions present in df at the specified hdfs path. Can someone please help me in this?

9条回答
不美不萌又怎样
2楼-- · 2019-01-05 00:20

Finally! This is now a feature in Spark 2.3.0: https://issues.apache.org/jira/browse/SPARK-20236

To use it, you need to set the spark.sql.sources.partitionOverwriteMode setting to dynamic, the dataset needs to be partitioned, and the write mode overwrite. Example:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

I recommend doing a repartition based on your partition column before writing, so you won't end up with 400 files per folder.

Before Spark 2.3.0, the best solution would be to launch SQL statements to delete those partitions and then write them with mode append.

查看更多
聊天终结者
3楼-- · 2019-01-05 00:20

Using Spark 1.6...

The HiveContext can simplify this process greatly. The key is that you must create the table in Hive first using a CREATE EXTERNAL TABLE statement with partitioning defined. For example:

# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'

From here, let's say you have a Dataframe with new records in it for a specific partition (or multiple partitions). You can use a HiveContext SQL statement to perform an INSERT OVERWRITE using this Dataframe, which will overwrite the table for only the partitions contained in the Dataframe:

# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')

hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
                   SELECT name, age
                   FROM update_dataframe""")

Note: update_dataframe in this example has a schema that matches that of the target test table.

One easy mistake to make with this approach is to skip the CREATE EXTERNAL TABLE step in Hive and just make the table using the Dataframe API's write methods. For Parquet-based tables in particular, the table will not be defined appropriately to support Hive's INSERT OVERWRITE... PARTITION function.

Hope this helps.

查看更多
对你真心纯属浪费
4楼-- · 2019-01-05 00:25

I would suggest you doing clean-up and then writing new partitions with Append mode:

import scala.sys.process._
def deletePath(path: String): Unit = {
    s"hdfs dfs -rm -r -skipTrash $path".!
}

df.select(partitionColumn).distinct.collect().foreach(p => {
    val partition = p.getAs[String](partitionColumn)
    deletePath(s"$path/$partitionColumn=$partition")
})

df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)

This will delete only new partitions. After writing data run this command if you need to update metastore:

sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")

Note: deletePath assumes that hfds command is available on your system.

查看更多
登录 后发表回答