i have the following program in Scala for the spark:
val dfA = sqlContext.sql("select * from employees where id in ('Emp1', 'Emp2')" )
val dfB = sqlContext.sql("select * from employees where id not in ('Emp1', 'Emp2')" )
val dfN = dfA.withColumn("department", lit("Finance"))
val dfFinal = dfN.unionAll(dfB)
dfFinal.registerTempTable("intermediate_result")
dfA.unpersist
dfB.unpersist
dfN.unpersist
dfFinal.unpersist
val dfTmp = sqlContext.sql("select * from intermediate_result")
dfTmp.write.mode("overwrite").format("parquet").saveAsTable("employees")
dfTmp.unpersist
when I try to save it, I get the following error:
org.apache.spark.sql.AnalysisException: Cannot overwrite table employees
that is also being read from.;
at org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis(rules.scala:106)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:182)
at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$3.apply(rules.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:111)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:109)
at org.apache.spark.sql.execution.datasources.PreWriteCheck.apply(rules.scala:105)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:218)
at scala.collection.immutable.List.foreach(List.scala:318)
My questions are:
- Is my approach correct to change the department of two employees
- Why am I getting this error when I have released the DataFrames
Is my approach correct to change the department of two employees
It is not. Just to repeat something that has been said multiple times on Stack Overflow - Apache Spark is not a database. It is not designed for fine grained updates. If your projects requires operation like this, use one of many databases on Hadoop.
Why am I getting this error when I have released the DataFrames
Because you didn't. All you've done is adding a name to the execution plan. Checkpointing would be the closest thing to "releasing", but you really don't want to end up in situation when you loose executor, in the middle of destructive operation.
You could write to temporary directory, delete input and move the temporary files, but really - just use a tool which is fit for the job.
Following is an approach you can try.
Instead of using registertemptable api, you can write it into an another table using the saveAsTable api
dfFinal.write.mode("overwrite").saveAsTable("intermediate_result")
Then, write it into employees table
val dy = sqlContext.table("intermediate_result")
dy.write.mode("overwrite").insertInto("employees")
Finally, drop intermediate_result table.
I would approach it this way,
>>> df = sqlContext.sql("select * from t")
>>> df.show()
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
| 2| Fitness|
| 3| Footwear|
| 4| Apparel|
| 5| Golf|
| 6| Outdoors|
| 7| Fan Shop|
+-------------+---------------+
To mimic your flow, I creating 2 dataframes, doing union
and writing back to same table t
( deliberately removing department_id = 4
in this example)
>>> df1 = sqlContext.sql("select * from t where department_id < 4")
>>> df2 = sqlContext.sql("select * from t where department_id > 4")
>>> df3 = df1.unionAll(df2)
>>> df3.registerTempTable("df3")
>>> sqlContext.sql("insert overwrite table t select * from df3")
DataFrame[]
>>> sqlContext.sql("select * from t").show()
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
| 2| Fitness|
| 3| Footwear|
| 5| Golf|
| 6| Outdoors|
| 7| Fan Shop|
+-------------+---------------+
Lets say it is a hive table you are reading and overwriting.
Please introduce the timestamp to the hive table location as follows
create table table_name (
id int,
dtDontQuery string,
name string
)
Location hdfs://user/table_name/timestamp
As overwrite is not possible, We will write the output file to a new location.
Write the data to that new location using dataframe Api
df.write.orc(hdfs://user/xx/tablename/newtimestamp/)
Once Data is written alter the hive table location to new location
Alter table tablename set Location hdfs://user/xx/tablename/newtimestamp/