Creating hive table using parquet file metadata

2019-03-11 10:59发布

I wrote a DataFrame as parquet file. And, I would like to read the file using Hive using the metadata from parquet.

Output from writing parquet write

_common_metadata  part-r-00000-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet  part-r-00002-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet  _SUCCESS
_metadata         part-r-00001-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet  part-r-00003-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet

Hive table

CREATE  TABLE testhive
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  '/home/gz_files/result';



FAILED: SemanticException [Error 10043]: Either list of columns or a custom serializer should be specified

How can I infer the meta data from parquet file?

If I open the _common_metadata I have below content,

PAR1LHroot
%TSN%
%TS%
%Etype%
)org.apache.spark.sql.parquet.row.metadata▒{"type":"struct","fields":[{"name":"TSN","type":"string","nullable":true,"metadata":{}},{"name":"TS","type":"string","nullable":true,"metadata":{}},{"name":"Etype","type":"string","nullable":true,"metadata":{}}]}

Or how to parse meta data file?

5条回答
forever°为你锁心
2楼-- · 2019-03-11 11:36

A small improvement over Victor (adding quotes on field.name) and modified to bind the table to a local parquet file (tested on spark 1.6.1)

def dataFrameToDDL(dataFrame: DataFrame, tableName: String, absFilePath: String): String = {
    val columns = dataFrame.schema.map { field =>
      "  `" + field.name + "` " + field.dataType.simpleString.toUpperCase
    }
    s"CREATE EXTERNAL TABLE $tableName (\n${columns.mkString(",\n")}\n) STORED AS PARQUET LOCATION '"+absFilePath+"'"
  }

Also notice that:

  • A HiveContext is needed since SQLContext does not support creating external table.
  • The path to the parquet folder must be an absolute path
查看更多
萌系小妹纸
3楼-- · 2019-03-11 11:42

Actually, Impala supports

CREATE TABLE LIKE PARQUET

(no columns section altogether):

http://www.cloudera.com/content/www/en-us/documentation/archive/impala/2-x/2-1-x/topics/impala_create_table.html

Tags of your question have "hive" and "spark" and I don't see this is implemented in Hive, but in case you use CDH, it may be what you were looking for.

查看更多
霸刀☆藐视天下
4楼-- · 2019-03-11 11:44

Here's a solution I've come up with to get the metadata from parquet files in order to create a Hive table.

First start a spark-shell (Or compile it all into a Jar and run it with spark-submit, but the shell is SOO much easier)

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame


val df=sqlContext.parquetFile("/path/to/_common_metadata")

def creatingTableDDL(tableName:String, df:DataFrame): String={
  val cols = df.dtypes
  var ddl1 = "CREATE EXTERNAL TABLE "+tableName + " ("
  //looks at the datatypes and columns names and puts them into a string
  val colCreate = (for (c <-cols) yield(c._1+" "+c._2.replace("Type",""))).mkString(", ")
  ddl1 += colCreate + ") STORED AS PARQUET LOCATION '/wherever/you/store/the/data/'"
  ddl1
}

val test_tableDDL=creatingTableDDL("test_table",df,"test_db")

It will provide you with the datatypes that Hive will use for each column as they are stored in Parquet. E.G: CREATE EXTERNAL TABLE test_table (COL1 Decimal(38,10), COL2 String, COL3 Timestamp) STORED AS PARQUET LOCATION '/path/to/parquet/files'

查看更多
The star\"
5楼-- · 2019-03-11 11:44

I had the same question. It might be hard to implement from pratcical side though, as Parquet supports schema evolution:

http://www.cloudera.com/content/www/en-us/documentation/archive/impala/2-x/2-0-x/topics/impala_parquet.html#parquet_schema_evolution_unique_1

For example, you could add a new column to your table and you don't have to touch data that's already in the table. It's only new datafiles will have new metadata (compatible with previous version).

Schema merging is switched off by default since Spark 1.5.0 since it is "relatively expensive operation" http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging So infering most recent schema may not be as simple as it sounds. Although quick-and-dirty approaches are quite possible e.g. by parsing output from

$ parquet-tools schema /home/gz_files/result/000000_0
查看更多
霸刀☆藐视天下
6楼-- · 2019-03-11 11:55

I'd just like to expand on James Tobin's answer. There's a StructField class which provides Hive's data types without doing string replacements.

// Tested on Spark 1.6.0.

import org.apache.spark.sql.DataFrame

def dataFrameToDDL(dataFrame: DataFrame, tableName: String): String = {
    val columns = dataFrame.schema.map { field =>
        "  " + field.name + " " + field.dataType.simpleString.toUpperCase
    }

    s"CREATE TABLE $tableName (\n${columns.mkString(",\n")}\n)"
}

This solves the IntegerType problem.

scala> val dataFrame = sc.parallelize(Seq((1, "a"), (2, "b"))).toDF("x", "y")
dataFrame: org.apache.spark.sql.DataFrame = [x: int, y: string]

scala> print(dataFrameToDDL(dataFrame, "t"))
CREATE TABLE t (
  x INT,
  y STRING
)

This should work with any DataFrame, not just with Parquet. (e.g., I'm using this with a JDBC DataFrame.)

As an added bonus, if your target DDL supports nullable columns, you can extend the function by checking StructField.nullable.

查看更多
登录 后发表回答