Corrupt rows written to __HIVE_DEFAULT_PARTITION__

2019-08-04 05:20发布

问题:

I am seeing some very odd behaviour when attempting to overwrite a partition in a Hive table using Spark 2.3

Firstly I am setting the following setting when building my SparkSession:

.config("spark.sql.sources.partitionOverwriteMode", "dynamic")

I am then copying some data into new table and partitioning by the date_id column.

ds
  .write
  .format("parquet")
  .option("compression", "snappy")
  .option("auto.purge", "true")
  .mode(saveMode)
  .partitionBy("date_id")
  .saveAsTable("tbl_copy")

I can see in HDFS that the relevant date_id directories have been created.

I then create a DataSet containing data for the partition I wish to overwrite which contains data for a single date_id and insert into Hive as follows:

  ds
    .write
    .mode(SaveMode.Overwrite)
    .insertInto("tbl_copy")

As a sanity check I write the same Dataset to a new table.

      ds
        .write
        .format("parquet")
        .option("compression", "snappy")
        .option("auto.purge", "true")
        .mode(SaveMode.Overwrite)
        .saveAsTable("tmp_tbl")

The data in tmp_tbl is exactly as expected.

However when I look at tbl_copy I see a new HDFS directory `date_id=HIVE_DEFAULT_PARTITION

Querying tbl_cpy

SELECT * from tbl_copy WHERE date_id IS NULL

I see the rows that should have been inserted into partition date_id=20180523 however the date_id column is null and an unrelated row_changed column has been populated with value 20180523.

It appears the insert into Hive is somehow causing my data to get mangled. Writing the same Dataset into a new table causes no issues.

Could anyone shed any light on this?

回答1:

So it appears that partition columns must be the last ones in the Dataset.

I have solved the problem by pimping the following method onto Dataset[T].

def partitionsTail(partitionColumns: Seq[String]) = {
  val columns = dataset.schema.collect{ case s if !partitionColumns.contains(s.name) => s.name} ++ partitionColumns

  dataset.select(columns.head, columns.tail: _*).as[T]
} 


回答2:

Yes, this is a tricky behavior, explain in doc:

https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/DataFrameWriter.html#insertInto(java.lang.String)

Unlike saveAsTable, insertInto ignores the column names and just uses position-based resolution. For example:

    scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
    scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
    scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
    scala> sql("select * from t1").show
    +---+---+
    |  i|  j|
    +---+---+
    |  5|  6|
    |  3|  4|
    |  1|  2|
    +---+---+