I have 3dataframes generated from 3 different processes.
Every dataframe is having columns of same name.
My dataframe looks like this
id val1 val2 val3 val4
1 null null null null
2 A2 A21 A31 A41
id val1 val2 val3 val4
1 B1 B21 B31 B41
2 null null null null
id val1 val2 val3 val4
1 C1 C2 C3 C4
2 C11 C12 C13 C14
Out of these 3 dataframes, i want to create two dataframes, (final and consolidated).
For final, order of preferences -
dataFrame 1 > Dataframe 2 > Dataframe 3
If a result is there in dataframe 1(val1 != null), i will store that row in final dataframe.
My final result should be :
id finalVal1 finalVal2 finalVal3 finalVal4
1 B1 B21 B31 B41
2 A2 A21 A31 A41
Consolidated Dataframe will store results from all 3.
How can i do that efficiently?
If I understood you correctly, for each row you want to find out the first non-null values, first by looking into the first table, then the second table, then the third table.
You simply need to join these three tables based on the id
and then use the coalesce
function to get the first non-null element
import org.apache.spark.sql.functions._
val df1 = sc.parallelize(Seq(
(1,null,null,null,null),
(2,"A2","A21","A31", "A41"))
).toDF("id", "val1", "val2", "val3", "val4")
val df2 = sc.parallelize(Seq(
(1,"B1","B21","B31", "B41"),
(2,null,null,null,null))
).toDF("id", "val1", "val2", "val3", "val4")
val df3 = sc.parallelize(Seq(
(1,"C1","C2","C3","C4"),
(2,"C11","C12","C13", "C14"))
).toDF("id", "val1", "val2", "val3", "val4")
val consolidated = df1.join(df2, "id").join(df3, "id").select(
df1("id"),
coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"),
coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"),
coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"),
coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4")
)
Which gives you the expected output
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1| B21| B31| B41|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+
Edit: New solution with partially null lines. It avoids joins, but uses a window function and a distinct...
case class a(id:Int,val1:String,val2:String,val3:String,val4:String)
val df1 = sc.parallelize(List(
a(1,null,null,null,null),
a(2,"A2","A21","A31","A41"),
a(3,null,null,null,null))).toDF()
val df2 = sc.parallelize(List(
a(1,"B1",null,"B31","B41"),
a(2,null,null,null,null),
a(3,null,null,null,null))).toDF()
val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"),
a(2,"C11","C12","C13","C14"),
a(3,"C11","C12","C13","C14"))).toDF()
val anyNotNull = df1.columns.tail.map(c => col(c).isNotNull).reduce(_ || _)
val consolidated = {
df1
.filter(anyNotNull)
.withColumn("foo",lit(1))
.unionAll(df2.filter(anyNotNull).withColumn("foo",lit(2)))
.unionAll(df3.filter(anyNotNull).withColumn("foo",lit(3)))
}
scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1|null| B31| B41|
| 1| B1| C2| B31| B41|
| 3| C11| C12| C13| C14|
| 2| A2| A21| A31| A41|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+
val w = Window.partitionBy('id).orderBy('foo)
val coalesced = col("id") +: df1.columns.tail.map(c => first(col(c),true).over(w).as(c))
val finalDF = consolidated.select(coalesced:_*).na.drop.distinct
scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1| C2| B31| B41|
| 3| C11| C12| C13| C14|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+
Old solution:
If you have only full lines of null
or no null at all, you can do this (edit: the advantage over the other solution is that you avoid the distinct)
data:
case class a(id:Int,val1:String,val2:String,val3:String,val4:String)
val df1 = sc.parallelize(List(
a(1,null,null,null,null),
a(2,"A2","A21","A31","A41"),
a(3,null,null,null,null))).toDF()
val df2 = sc.parallelize(List(
a(1,"B1","B21","B31","B41"),
a(2,null,null,null,null),
a(3,null,null,null,null))).toDF()
val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"),
a(2,"C11","C12","C13","C14"),
a(3,"C11","C12","C13","C14"))).toDF()
consolidated:
val consolidated = {
df1.na.drop.withColumn("foo",lit(1))
.unionAll(df2.na.drop.withColumn("foo",lit(2)))
.unionAll(df3.na.drop.withColumn("foo",lit(3)))
}
scala> consolidated.show()
+---+----+----+----+----+---+
| id|val1|val2|val3|val4|foo|
+---+----+----+----+----+---+
| 2| A2| A21| A31| A41| 1|
| 1| B1| B21| B31| B41| 2|
| 1| C1| C2| C3| C4| 3|
| 2| C11| C12| C13| C14| 3|
| 3| C11| C12| C13| C14| 3|
+---+----+----+----+----+---+
Final
val w = Window.partitionBy('id).orderBy('foo)
val finalDF = consolidated
.withColumn("foo2",rank().over(w))
.filter('foo2===1)
.drop("foo").drop("foo2")
scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1| B21| B31| B41|
| 3| C11| C12| C13| C14|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+
Below is an example of joining six tables/dataframes (not using SQL)
retail_db is a well known sample DB, anyone can get it from Google
Problem: //Get all customers from TX who bought fitness items
val df_customers = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "customers").option("user", "root").option("password", "root").load()
val df_products = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "products").option("user", "root").option("password", "root").load()
val df_orders = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "orders"). option("user", "root").option("password", "root").load()
val df_order_items = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_items").option("user", "root").option("password", "root").load()
val df_categories = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "categories").option("user", "root").option("password", "root").load()
val df_departments = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "departments").option("user", "root").option("password", "root").load()
val df_order_items_all = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_all").option("user", "root").option("password", "root").load()
val jeCustOrd=df_customers.col("customer_id")===df_orders.col("order_customer_id")
val jeOrdItem=df_orders.col("order_id")===df_order_items.col("order_item_order_id")
val jeProdOrdItem=df_products.col("product_id")===df_order_items.col("order_item_product_id")
val jeProdCat=df_products.col("product_category_id")===df_categories.col("category_id")
val jeCatDept=df_categories.col("category_department_id")===df_departments.col("department_id")
df_customers.where("customer_state = 'TX'").join(df_orders,jeCustOrd).join(df_order_items,jeOrdItem).join(df_products,jeProdOrdItem).join(df_categories,jeProdCat).join(df_departments,jeCatDept).filter("department_name='Fitness'")
.select("customer_id","customer_fname","customer_lname", "customer_street","customer_city","customer_state","customer_zipcode","order_id","category_name","department_name").show(5)
If they are from three different tabels, I would use push down filters to filter them on server and use join between data frame join function to join them together.
If they are not from database tables; you can use filter and map high order function to the same parallel.