I have a dataframe joinDf
created from joining the following four dataframes on userId
:
val detailsDf = Seq((123,"first123","xyz"))
.toDF("userId","firstName","address")
val emailDf = Seq((123,"abc@gmail.com"),
(123,"def@gmail.com"))
.toDF("userId","email")
val foodDf = Seq((123,"food2",false,"Italian",2),
(123,"food3",true,"American",3),
(123,"food1",true,"Mediterranean",1))
.toDF("userId","foodName","isFavFood","cuisine","score")
val gameDf = Seq((123,"chess",false,2),
(123,"football",true,1))
.toDF("userId","gameName","isOutdoor","score")
val joinDf = detailsDf
.join(emailDf, Seq("userId"))
.join(foodDf, Seq("userId"))
.join(gameDf, Seq("userId"))
User
's food and game favorites should be ordered by score in the ascending order.
I am trying to create a result from this joinDf
where the JSON looks like the following:
[
{
"userId": "123",
"firstName": "first123",
"address": "xyz",
"UserFoodFavourites": [
{
"foodName": "food1",
"isFavFood": "true",
"cuisine": "Mediterranean",
},
{
"foodName": "food2",
"isFavFood": "false",
"cuisine": "Italian",
},
{
"foodName": "food3",
"isFavFood": "true",
"cuisine": "American",
}
]
"UserEmail": [
"abc@gmail.com",
"def@gmail.com"
]
"UserGameFavourites": [
{
"gameName": "football",
"isOutdoor": "true"
},
{
"gameName": "chess",
"isOutdoor": "false"
}
]
}
]
Should I use joinDf.groupBy().agg(collect_set())
?
Any help would be appreciated.
My solution is based on the answers found here and here
It uses the Window function. It shows how to create a nested list of food preferences for a given
userid
based on the food score. Here we are creating astruct
ofFoodDetails
from the columns we haveHere we are creating a Windowing function which will accumulate the list for a
userId
based on theFoodDetails.score
in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with sameuserId
. After we have done accumulating, we have to do agroupBy
over theuserId
to select the largest list.This is how the result looks like finally :
Given this dataframe, it should be easier to write out the nested json.
The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.
Output:
As all groupBy are done on userId and joins as well, spark will optimise it quite well.
UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.