How to write a table to hive from spark without us

2020-03-30 07:27发布


when trying to use spark 2.3 on HDP 3.1 to write to a Hive table without the warehouse connector directly into hives schema using:

spark-shell --driver-memory 16g --master local[3] --conf spark.hadoop.metastore.catalog.default=hive
val df = Seq(1,2,3,4).toDF
spark.sql("create database foo")

fails with:

Table foo.my_table_01 failed strict managed table checks due to the following reason: Table is marked as a managed table but is not transactional

but a:

val df = Seq(1,2,3,4).toDF.withColumn("part", col("value"))
df.write.partitionBy("part").option("compression", "zlib").mode(SaveMode.Overwrite).format("orc").saveAsTable("foo.my_table_02")

Spark with spark.sql("select * from foo.my_table_02").show works just fine. Now going to Hive / beeline:

0: jdbc:hive2://hostname:2181/> select * from my_table_02;
Error: java.lang.IllegalArgumentException: bucketId out of range: -1 (state=,code=0)


 describe extended my_table_02;


|          col_name           |                     data_type                      | comment  |
| value                       | int                                                |          |
| part                        | int                                                |          |
|                             | NULL                                               | NULL     |
| # Partition Information     | NULL                                               | NULL     |
| # col_name                  | data_type                                          | comment  |
| part                        | int                                                |          |
|                             | NULL                                               | NULL     |
| Detailed Table Information  | Table(tableName:my_table_02, dbName:foo, owner:hive/, createTime:1571201905, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:value, type:int, comment:null), FieldSchema(name:part, type:int, comment:null)], location:hdfs://,,, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,, parameters:{path=hdfs://, compression=zlib, serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[FieldSchema(name:part, type:int, comment:null)], parameters:{numRows=0, rawDataSize=0, spark.sql.sources.schema.partCol.0=part, transient_lastDdlTime=1571201906, bucketing_version=2, spark.sql.create.version=, totalSize=740, spark.sql.sources.schema.numPartCols=1, spark.sql.sources.schema.part.0={\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}, numFiles=4, numPartitions=4, spark.sql.partitionProvider=catalog, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=orc, transactional=true}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, rewriteEnabled:false, catName:hive, ownerType:USER, writeId:-1) |

How can I use spark to write to hive without using the warehouse connector but still writing to the same metastore which can later on be read by hive? To my best knowledge external tables should be possible (thy are not managed, not ACID not transactional), but I am not sure how to tell the saveAsTable how to handle these.


related issues:

  • Table loaded through Spark not accessible in Hive
    • setting the properties there proposed in the answer do not solve my issue
  • seems also to be a bug:

Might be a workaround like the like but I do not like the idea of using more duct tape where I have not seen any large scale performance tests just yet. Also, this means changing all existing spark jobs.

In fact Cant save table to hive metastore, HDP 3.0 reports issues with large data frames and the warehouse connector.


I just found


execute() vs executeQuery()

ExecuteQuery() will always use the Hiveserver2-interactive/LLAP as it uses the fast ARROW protocol. Using it when the jdbc URL point to the non-LLAP Hiveserver2 will yield an error.

Execute() uses JDBC and does not have this dependency on LLAP, but has a built-in restriction to only return 1.000 records max. But for most queries (INSERT INTO ... SELECT, count, sum, average) that is not a problem.

But doesn't this kill any high-performance interoperability between hive and spark? Especially if there are not enough LLAP nodes available for large scale ETL.

In fact, this is true. This setting can be configured at, though I am not sure of the performance impact of increasing this value


Did you try

    data.write \
        .mode("append") \


Inside Ambari simply disabling the option of creating transactional tables by default solves my problem.

set to false twice (tez, llap)

hive.strict.managed.tables = false

and enable manually in each table property if desired (to use a transactional table).


Creating an external table (as a workaround) seems to be the best option for me. This still involves HWC to register the column metadata or update the partition information.

Something along these lines:

val df:DataFrame = ...
val externalPath = "/warehouse/tablespace/external/hive/my_db.db/my_table"
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
dxx.write.partitionBy("part_col").option("compression", "zlib").mode(SaveMode.Overwrite).orc(externalPath)
val columns = dxx.drop("part_col") => s"${} ${field.dataType.simpleString}").mkString(", ")
val ddl =
         |CREATE EXTERNAL TABLE my_db.my_table ($columns)
         |PARTITIONED BY (part_col string)
         |STORED AS ORC 
         |Location '$externalPath'

hive.execute(s"MSCK REPAIR TABLE $tablename SYNC PARTITIONS")

Unfortunately, this throws a:

java.sql.SQLException: The query did not generate a result set!

from HWC