Read from a hive table and write back to it using

2020-05-23 18:10发布

问题:

I am reading a Hive table using Spark SQL and assigning it to a scala val

val x = sqlContext.sql("select * from some_table")

Then I am doing some processing with the dataframe x and finally coming up with a dataframe y , which has the exact schema as the table some_table.

Finally I am trying to insert overwrite the y dataframe to the same hive table some_table

y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table")

Then I am getting the error

org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from

I tried creating an insert sql statement and firing it using sqlContext.sql() but it too gave me the same error.

Is there any way I can bypass this error? I need to insert the records back to the same table.


Hi I tried doing as suggested , but still getting the same error .

val x = sqlContext.sql("select * from incremental.test2")
val y = x.limit(5)
y.registerTempTable("temp_table")
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("incremental.test2")

scala> dy.write.mode("overwrite").insertInto("incremental.test2")
             org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.;

回答1:

You should first save your DataFrame y in a temporary table

y.write.mode("overwrite").saveAsTable("temp_table")

Then you can overwrite rows in your target table

val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("some_table")


回答2:

Actually you can also use checkpointing to achieve this. Since it breaks data lineage, Spark is not able to detect that you are reading and overwriting in the same table:

 sqlContext.sparkContext.setCheckpointDir(checkpointDir)
 val ds = sqlContext.sql("select * from some_table").checkpoint()
 ds.write.mode("overwrite").saveAsTable("some_table")


回答3:

You should first save your DataFrame y like a parquet file:

y.write.parquet("temp_table")

After you load this like:

val parquetFile = sqlContext.read.parquet("temp_table")

And finish you insert your data in your table

parquetFile.write.insertInto("some_table")


回答4:

In context to Spark 2.2

  1. This error means that our process is reading from same table and writing to same table.
  2. Normally, this should work as process writes to directory .hiveStaging...
  3. This error occurs in case of saveAsTable method, as it overwrites entire table instead of individual partitions.
  4. This error should not occur with insertInto method, as it overwrites partitions not the table.
  5. A reason why this happening is because Hive table has following Spark TBLProperties in its definition. This problem will solve for insertInto method if you remove following Spark TBLProperties -

'spark.sql.partitionProvider' 'spark.sql.sources.provider' 'spark.sql.sources.schema.numPartCols 'spark.sql.sources.schema.numParts' 'spark.sql.sources.schema.part.0' 'spark.sql.sources.schema.part.1' 'spark.sql.sources.schema.part.2' 'spark.sql.sources.schema.partCol.0' 'spark.sql.sources.schema.partCol.1'

https://querydb.blogspot.com/2019/07/read-from-hive-table-and-write-back-to.html



回答5:

Read the data from hive table in spark:

val hconfig = new org.apache.hadoop.conf.Configuration()
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hconfig , "dbname", "tablename")

val inputFormat = (new HCatInputFormat).asInstanceOf[InputFormat[WritableComparable[_],HCatRecord]].getClass

val data = sc.newAPIHadoopRDD(hconfig,inputFormat,classOf[WritableComparable[_]],classOf[HCatRecord])