Spark SQL on ORC files doesn't return correct

2019-01-15 20:41发布

I have a directory containing ORC files. I am creating a DataFrame using the below code

var data = sqlContext.sql("SELECT * FROM orc.`/directory/containing/orc/files`");

It returns data frame with this schema

[_col0: int, _col1: bigint]

Where as the expected schema is

[scan_nbr: int, visit_nbr: bigint]

When I query on files in parquet format I get correct schema.

Am I missing any configuration(s)?

Adding more details

This is Hortonworks Distribution HDP 2.4.2 (Spark 1.6.1, Hadoop 2.7.1, Hive 1.2.1)

We haven't changed the default configurations of HDP, but this is definitely not the same as the plain vanilla version of Hadoop.

Data is written by upstream Hive jobs, a simple CTAS (CREATE TABLE sample STORED AS ORC as SELECT ...).

I tested this on filed generated by CTAS with the latest 2.0.0 hive & it preserves the column names in the orc files.

5条回答
男人必须洒脱
2楼-- · 2019-01-15 20:53

Setting

sqlContext.setConf('spark.sql.hive.convertMetastoreOrc', 'false')

fixes this.

查看更多
Emotional °昔
3楼-- · 2019-01-15 20:55

If version upgrade is not an available option, quick fix could be to rewrite ORC file using PIG. That seems to work just fine.

查看更多
We Are One
4楼-- · 2019-01-15 20:57

We can use:

val df = hiveContext.read.table("tableName")

Your df.schema or df.columns will give actual column names.

查看更多
走好不送
5楼-- · 2019-01-15 21:04

The problem is the Hive version, which is 1.2.1, which has this bug HIVE-4243

This was fixed in 2.0.0.

查看更多
三岁会撩人
6楼-- · 2019-01-15 21:11

If you have the parquet version as well, you can just copy the column names over, which is what I did (also, the date column was partition key for orc so had to move it to the end):

tx = sqlContext.table("tx_parquet")
df = sqlContext.table("tx_orc")
tx_cols = tx.schema.names
tx_cols.remove('started_at_date')
tx_cols.append('started_at_date') #move it to end
#fix column names for orc
oldColumns = df.schema.names
newColumns = tx_cols
df = functools.reduce(
    lambda df, idx: df.withColumnRenamed(
        oldColumns[idx], newColumns[idx]), range(
            len(oldColumns)), df)
查看更多
登录 后发表回答