I have a dataframe joinDf
created from joining the following four dataframes on userId
:
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val joinDf = detailsDf
.join(emailDf, Seq("userId"))
.join(foodDf, Seq("userId"))
.join(gameDf, Seq("userId"))
User
's food and game favorites should be ordered by score in the ascending order.
I am trying to create a result from this joinDf
where the JSON looks like the following:
[
{
"userId": "123",
"firstName": "first123",
"address": "xyz",
"UserFoodFavourites": [
{
"foodName": "food1",
"isFavFood": "true",
"cuisine": "Mediterranean",
},
{
"foodName": "food2",
"isFavFood": "false",
"cuisine": "Italian",
},
{
"foodName": "food3",
"isFavFood": "true",
"cuisine": "American",
}
]
"UserEmail": [
"abc@gmail.com",
"def@gmail.com"
]
"UserGameFavourites": [
{
"gameName": "football",
"isOutdoor": "true"
},
{
"gameName": "chess",
"isOutdoor": "false"
}
]
}
]
Should I use joinDf.groupBy().agg(collect_set())
?
Any help would be appreciated.
My solution is based on the answers found here and here
It uses the Window function. It shows how to create a nested list of food preferences for a given userid
based on the food score. Here we are creating a struct
of FoodDetails
from the columns we have
val foodModifiedDf = foodDf.withColumn("FoodDetails",
struct("foodName","isFavFood", "cuisine","score"))
.drop("foodName","isFavFood", "cuisine","score")
println("Just printing the food detials here")
foodModifiedDf.show(10, truncate = false)
Here we are creating a Windowing function which will accumulate the list for a userId
based on the FoodDetails.score
in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId
. After we have done accumulating, we have to do a groupBy
over the userId
to select the largest list.
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))
val userAndFood = detailsDf.join(foodModifiedDf, "userId")
val newUF = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")
println(" UserAndFood dataframe after windowing function applied")
newUF.show(10, truncate = false)
val resultUF = newUF.groupBy("userId")
.agg(max("FDNew"))
println("Final result after select the maximum length list")
resultUF.show(10, truncate = false)
This is how the result looks like finally :
+------+-----------------------------------------------------------------------------------------+
|userId|max(FDNew) |
+------+-----------------------------------------------------------------------------------------+
|123 |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
+------+-----------------------------------------------------------------------------------------+
Given this dataframe, it should be easier to write out the nested json.
The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))
val foodGrp = foodDf
.select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
.groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))
val gameGrp = gameDf
.select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
.groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))
val result = detailsDf.join(emailGrp, Seq("userId"))
.join(foodGrp, Seq("userId"))
.join(gameGrp, Seq("userId"))
result.show(100, false)
Output:
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|userId|firstName|address|UserEmail |UserFoodFavourites |UserGameFavourites |
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|123 |first123 |xyz |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
As all groupBy are done on userId and joins as well, spark will optimise it quite well.
UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.