i have the following program in Scala for the spark:
val dfA = sqlContext.sql("select * from employees where id in ('Emp1', 'Emp2')" )
val dfB = sqlContext.sql("select * from employees where id not in ('Emp1', 'Emp2')" )
val dfN = dfA.withColumn("department", lit("Finance"))
val dfFinal = dfN.unionAll(dfB)
dfFinal.registerTempTable("intermediate_result")
dfA.unpersist
dfB.unpersist
dfN.unpersist
dfFinal.unpersist
val dfTmp = sqlContext.sql("select * from intermediate_result")
dfTmp.write.mode("overwrite").format("parquet").saveAsTable("employees")
dfTmp.unpersist
when I try to save it, I get the following error:
org.apache.spark.sql.AnalysisException: Cannot overwrite table
employees
that is also being read from.; at org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis(rules.scala:106) at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:182) at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:109) at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:111) at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:109) at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:105) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218) at scala.collection.immutable.List.foreach(List.scala:318)
My questions are:
- Is my approach correct to change the department of two employees
- Why am I getting this error when I have released the DataFrames
It is not. Just to repeat something that has been said multiple times on Stack Overflow - Apache Spark is not a database. It is not designed for fine grained updates. If your projects requires operation like this, use one of many databases on Hadoop.
Because you didn't. All you've done is adding a name to the execution plan. Checkpointing would be the closest thing to "releasing", but you really don't want to end up in situation when you loose executor, in the middle of destructive operation.
You could write to temporary directory, delete input and move the temporary files, but really - just use a tool which is fit for the job.
I would approach it this way,
To mimic your flow, I creating 2 dataframes, doing
union
and writing back to same tablet
( deliberately removingdepartment_id = 4
in this example)Following is an approach you can try.
Instead of using registertemptable api, you can write it into an another table using the saveAsTable api
Then, write it into employees table
Finally, drop intermediate_result table.
Lets say it is a hive table you are reading and overwriting.
Please introduce the timestamp to the hive table location as follows
As overwrite is not possible, We will write the output file to a new location.
Write the data to that new location using dataframe Api
Once Data is written alter the hive table location to new location