Spark dataframe to nested JSON

2019-07-24 17:03发布

问题:

I have a dataframe joinDf created from joining the following four dataframes on userId:

val detailsDf = Seq((123,"first123","xyz"))
                .toDF("userId","firstName","address")


val emailDf = Seq((123,"abc@gmail.com"),
                  (123,"def@gmail.com"))
              .toDF("userId","email")


val foodDf = Seq((123,"food2",false,"Italian",2),
                 (123,"food3",true,"American",3),
                 (123,"food1",true,"Mediterranean",1))
            .toDF("userId","foodName","isFavFood","cuisine","score")


val gameDf = Seq((123,"chess",false,2),
                 (123,"football",true,1))
             .toDF("userId","gameName","isOutdoor","score")

val joinDf = detailsDf
            .join(emailDf, Seq("userId"))
            .join(foodDf, Seq("userId"))
            .join(gameDf, Seq("userId"))

User's food and game favorites should be ordered by score in the ascending order.

I am trying to create a result from this joinDf where the JSON looks like the following:

[
  {
  "userId": "123",
  "firstName": "first123",
  "address": "xyz",
  "UserFoodFavourites": [
    {
     "foodName": "food1",
     "isFavFood": "true",
     "cuisine": "Mediterranean",
    },
    {
     "foodName": "food2",
     "isFavFood": "false",
     "cuisine": "Italian",
    },
    {
     "foodName": "food3",
     "isFavFood": "true",
     "cuisine": "American",
    }
   ]
   "UserEmail": [
     "abc@gmail.com",
     "def@gmail.com"
   ]
   "UserGameFavourites": [
     {
      "gameName": "football",
      "isOutdoor": "true"
     },
     {
      "gameName": "chess",
      "isOutdoor": "false"
     }
   ]
  }
]

Should I use joinDf.groupBy().agg(collect_set())?

Any help would be appreciated.

回答1:

My solution is based on the answers found here and here

It uses the Window function. It shows how to create a nested list of food preferences for a given userid based on the food score. Here we are creating a struct of FoodDetails from the columns we have

val foodModifiedDf = foodDf.withColumn("FoodDetails",
                            struct("foodName","isFavFood", "cuisine","score"))
                            .drop("foodName","isFavFood", "cuisine","score")

println("Just printing the food detials here")
foodModifiedDf.show(10, truncate = false)

Here we are creating a Windowing function which will accumulate the list for a userId based on the FoodDetails.score in descending order. The windowing function when applied goes on accumulating the list as it encounters new rows with same userId. After we have done accumulating, we have to do a groupBy over the userId to select the largest list.

import org.apache.spark.sql.expressions.Window


val window = Window.partitionBy("userId").orderBy( desc("FoodDetails.score"))

val userAndFood = detailsDf.join(foodModifiedDf, "userId")

val newUF  = userAndFood.select($"*", collect_list("FoodDetails").over(window) as "FDNew")

println(" UserAndFood dataframe after windowing function applied")
newUF.show(10, truncate = false)

val resultUF = newUF.groupBy("userId")
                  .agg(max("FDNew"))

println("Final result after select the maximum length list")
resultUF.show(10, truncate = false)

This is how the result looks like finally :

+------+-----------------------------------------------------------------------------------------+
|userId|max(FDNew)                                                                               |
+------+-----------------------------------------------------------------------------------------+
|123   |[[food3, true, American, 3], [food2, false, Italian, 2], [food1, true, Mediterranean, 1]]|
+------+-----------------------------------------------------------------------------------------+

Given this dataframe, it should be easier to write out the nested json.



回答2:

The main problem of joining before grouping and collecting lists is the fact that join will produce a lot of records for group by to collapse, in your example it is 12 records after join and before groupby, also you would need to worry about picking "firstName","address" out detailsDf out of 12 duplicates. To avoid both problems your could pre-process the food, email and game dataframes using struct and groupBy and join them to the detailsDf with no risk of exploding your data due to multiple records with the same userId in the joined tables.

val detailsDf = Seq((123,"first123","xyz"))
            .toDF("userId","firstName","address")


val emailDf = Seq((123,"abc@gmail.com"),
              (123,"def@gmail.com"))
          .toDF("userId","email")


val foodDf = Seq((123,"food2",false,"Italian",2),
             (123,"food3",true,"American",3),
             (123,"food1",true,"Mediterranean",1))
        .toDF("userId","foodName","isFavFood","cuisine","score")


val gameDf = Seq((123,"chess",false,2),
             (123,"football",true,1))
         .toDF("userId","gameName","isOutdoor","score")

val emailGrp = emailDf.groupBy("userId").agg(collect_list("email").as("UserEmail"))

val foodGrp = foodDf
          .select($"userId", struct("score", "foodName","isFavFood","cuisine").as("UserFoodFavourites"))
          .groupBy("userId").agg(sort_array(collect_list("UserFoodFavourites")).as("UserFoodFavourites"))

val gameGrp = gameDf
          .select($"userId", struct("gameName","isOutdoor","score").as("UserGameFavourites"))
          .groupBy("userId").agg(collect_list("UserGameFavourites").as("UserGameFavourites"))

val result = detailsDf.join(emailGrp, Seq("userId"))
        .join(foodGrp, Seq("userId"))
        .join(gameGrp, Seq("userId"))

result.show(100, false)

Output:

+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|userId|firstName|address|UserEmail                     |UserFoodFavourites                                                                       |UserGameFavourites                      |
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+
|123   |first123 |xyz    |[abc@gmail.com, def@gmail.com]|[[1, food1, true, Mediterranean], [2, food2, false, Italian], [3, food3, true, American]]|[[chess, false, 2], [football, true, 1]]|
+------+---------+-------+------------------------------+-----------------------------------------------------------------------------------------+----------------------------------------+

As all groupBy are done on userId and joins as well, spark will optimise it quite well.

UPDATE 1: After @user238607 pointed out that I have missed the original requirement of food preferences being sorted by score, did a quick fix and placed the score column as first element of structure UserFoodFavourites and used sort_array function to arrange data in desired order without forcing extra shuffle operation. Updated the code and its output.