I have a requirement to do the incremental loading to a table by using Spark (PySpark)
Here's the example:
Day 1
id | value
-----------
1 | abc
2 | def
Day 2
id | value
-----------
2 | cde
3 | xyz
Expected result
id | value
-----------
1 | abc
2 | cde
3 | xyz
This can be done easily in relational database,
Wondering whether this can be done in Spark or other transformational tool, e.g. Presto?
Here you go!
First Dataframe:
>>> list1 = [(1, 'abc'),(2,'def')]
>>> olddf = spark.createDataFrame(list1, ['id', 'value'])
>>> olddf.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
Second Dataframe:
>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
| 2| cde|
| 3| xyz|
+---+-----+
Now join and merge these two datafame using full outer join and use coalesce function while select and can replace the null values wih user defined values.
from pyspark.sql.functions import *
>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 3| xyz|
| 2| cde|
+---+-----+
I hope this should solve your problem. :-)
dataframe appending is done by union
function in pyspark. I'll demo with an example and create 2 dataframes as you mentioned in the question.
from pyspark.sql.types import Row
df1 = sqlContext.createDataFrame([Row(id=1,value="abc"),Row(id=2,value="def")])
df1.show()
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
df2 = sqlContext.createDataFrame([Row(id=2,value="cde"),Row(id=3,value="xyz")])
df2.show()
+---+-----+
| id|value|
+---+-----+
| 2| cde|
| 3| xyz|
+---+-----+
Lets do a union
between the two dataframes and you will get the desired result.
df2.union(df1).dropDuplicates(["id"]).show()
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 3| xyz|
| 2| cde|
+---+-----+
You can sort the output using asc
from pyspark.sql.functions
from pyspark.sql.functions import asc
df2.union(df1).dropDuplicates(["id"]).sort(asc("id")).show()
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| cde|
| 3| xyz|
+---+-----+
Workaround, add a date column in dataframe, then rank based on id and order by date in descending and take the rank == 1. It will always give you the latest record based on id.
df.("rank", rank().over(Window.partitionBy($"id").orderBy($"date".desc)))
.filter($"rank" === 1)
.drop($"rank")
.orderBy($"id")
.show