INSERT IF NOT EXISTS ELSE UPDATE in Spark SQL

2019-04-13 16:42发布

问题:

Is there any provision of doing "INSERT IF NOT EXISTS ELSE UPDATE" in Spark SQL.

I have Spark SQL table "ABC" that has some records. And then i have another batch of records that i want to Insert/update in this table based on whether they exist in this table or not.

is there a SQL command that i can use in SQL query to make this happen?

回答1:

In regular Spark this could be achieved with a join followed by a map like this:

import spark.implicits._
val df1 = spark.sparkContext.parallelize(List(("id1", "orginal"), ("id2", "original"))).toDF("df1_id", "df1_status")
val df2 = spark.sparkContext.parallelize(List(("id1", "new"), ("id3","new"))).toDF("df2_id", "df2_status")

val df3 = df1
  .join(df2, 'df1_id === 'df2_id, "outer")
  .map(row => {
    if (row.isNullAt(2))
      (row.getString(0), row.getString(1))
    else
      (row.getString(2), row.getString(3))
  })

This yields:

scala> df3.show
+---+--------+
| _1|      _2|
+---+--------+
|id3|     new| 
|id1|     new|
|id2|original|
+---+--------+

You could also use select with udfs instead of map, but in this particular case with null-values, I personally prefer the map variant.