Flattening JSON into Tabular Structure using Spark

2019-02-15 22:59发布

问题:

I have nested JSON and like to have output in tabular structure. I am able to parse the JSON values individually , but having some problems in tabularizing it. I am able to do it via dataframe easily. But I want do it using "RDD ONLY " functions. Any help much appreciated.

Input JSON:

  { "level":{"productReference":{  

     "prodID":"1234",

     "unitOfMeasure":"EA"

  },

  "states":[  
     {  
        "state":"SELL",
        "effectiveDateTime":"2015-10-09T00:55:23.6345Z",
        "stockQuantity":{  
           "quantity":1400.0,
           "stockKeepingLevel":"A"
        }
     },
     {  
        "state":"HELD",
        "effectiveDateTime":"2015-10-09T00:55:23.6345Z",
        "stockQuantity":{  
           "quantity":800.0,
           "stockKeepingLevel":"B"
        }
     }
  ] }}

Expected Output:

I tried Below Spark code . But getting output like this and Row() object is not able to parse this.

079562193,EA,List(SELLABLE, HELD),List(2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z),List(1400.0, 800.0),List(SINGLE, SINGLE)

def main(Args : Array[String]): Unit = {

  val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  val salesSchema = StructType(Array(
    StructField("prodID", StringType, true),
    StructField("unitOfMeasure", StringType, true),
    StructField("state", StringType, true),
    StructField("effectiveDateTime", StringType, true),
    StructField("quantity", StringType, true),
    StructField("stockKeepingLevel", StringType, true)
  ))

  val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")

  val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {

        parse(eachJsonMessages)

      }).map(insideEachJson=>{
        implicit  val formats = org.json4s.DefaultFormats

       val prodID = (insideEachJson\ "level" \"productReference" \"TPNB").extract[String].toString
       val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString

       val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
          map(x=>(x\"state").extract[String]).toString()
       val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
         map(x=>(x\"effectiveDateTime").extract[String]).toString
      val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
         map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]).
         toString
      val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
         map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]).
       toString

      //Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)

    println(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)

      }).collect()

    //  sqlContext.createDataFrame(x,salesSchema).show(truncate = false)

}

回答1:

There are 2 versions of solutions to your question.

Version 1:

def main(Args : Array[String]): Unit = {

  val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  val salesSchema = StructType(Array(
    StructField("prodID", StringType, true),
    StructField("unitOfMeasure", StringType, true),
    StructField("state", StringType, true),
    StructField("effectiveDateTime", StringType, true),
    StructField("quantity", StringType, true),
    StructField("stockKeepingLevel", StringType, true)
  ))

  val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")    

  val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {

    parse(eachJsonMessages)

  }).map(insideEachJson=>{
    implicit  val formats = org.json4s.DefaultFormats

   val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString
   val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString

   val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
      map(x=>(x\"state").extract[String]).toString()
   val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"effectiveDateTime").extract[String]).toString
  val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]).
     toString
  val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]).
   toString

  Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)

  })

    sqlContext.createDataFrame(x,salesSchema).show(truncate = false)

}

This would give you following output:

+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+
|prodID|unitOfMeasure|state           |effectiveDateTime                                         |quantity           |stockKeepingLevel|
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+
|1234  |EA           |List(SELL, HELD)|List(2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z)|List(1400.0, 800.0)|List(A, B)       |
+------+-------------+----------------+----------------------------------------------------------+-------------------+-----------------+

Version 2:

def main(Args : Array[String]): Unit = {

  val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  val salesSchema = StructType(Array(
    StructField("prodID", StringType, true),
    StructField("unitOfMeasure", StringType, true),
    StructField("state", ArrayType(StringType, true), true),
    StructField("effectiveDateTime", ArrayType(StringType, true), true),
    StructField("quantity", ArrayType(DoubleType, true), true),
    StructField("stockKeepingLevel", ArrayType(StringType, true), true)
  ))

  val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")    

  val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {

    parse(eachJsonMessages)

  }).map(insideEachJson=>{
    implicit  val formats = org.json4s.DefaultFormats

   val prodID = (insideEachJson\ "level" \"productReference" \"prodID").extract[String].toString
   val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString

   val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
      map(x=>(x\"state").extract[String])
   val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"effectiveDateTime").extract[String])
  val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double])
  val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
     map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String])

  Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)

  })


    sqlContext.createDataFrame(x,salesSchema).show(truncate = false)

}

This would give you following output:

+------+-------------+------------+------------------------------------------------------+---------------+-----------------+
|prodID|unitOfMeasure|state       |effectiveDateTime                                     |quantity       |stockKeepingLevel|
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+
|1234  |EA           |[SELL, HELD]|[2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z]|[1400.0, 800.0]|[A, B]           |
+------+-------------+------------+------------------------------------------------------+---------------+-----------------+

The difference between Version 1 & 2 is of schema. In Version 1 you are casting every column into String whereas in Version 2 they are being casted into Array.



回答2:

HI below is the "DATAFRAME" ONLY Solution which I developed. Looking for complete "RDD ONLY" solution

def main (Args : Array[String]):Unit = {

    val conf = new SparkConf().setAppName("JSON Read and Write using Spark DataFrame few more options").setMaster("local[1]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val sourceJsonDF = sqlContext.read.json("product.json")

         val jsonFlatDF_level = sourceJsonDF.withColumn("explode_states",explode($"level.states"))
        .withColumn("explode_link",explode($"level._link"))
      .select($"level.productReference.TPNB".as("TPNB"),
        $"level.productReference.unitOfMeasure".as("level_unitOfMeasure"),
        $"level.locationReference.location".as("level_location"),
        $"level.locationReference.type".as("level_type"),
        $"explode_states.state".as("level_state"),
        $"explode_states.effectiveDateTime".as("level_effectiveDateTime"),
        $"explode_states.stockQuantity.quantity".as("level_quantity"),
        $"explode_states.stockQuantity.stockKeepingLevel".as("level_stockKeepingLevel"),
        $"explode_link.rel".as("level_rel"),
        $"explode_link.href".as("level_href"),
        $"explode_link.method".as("level_method"))
jsonFlatDF_oldLevel.show()

  }


回答3:

DataFrame and DataSet are much more optimized than rdd and there are a lot of options to try with to reach to the solution we desire.

In my opinion, DataFrame is developed to make the developers comfortable viewing data in tabular form so that logics can be implemented with ease. So I always suggest users to use dataframe or dataset.

Talking much less, I am posting you the solution below using dataframe. Once you have a dataframe, switching to rdd is very easy.

Your desired solution is below (you will have to find a way to read json file as its done with json string below : thats an assignment for you :) good luck)

import org.apache.spark.sql.functions._
val json = """  { "level":{"productReference":{

                  "prodID":"1234",

                  "unitOfMeasure":"EA"

               },

               "states":[
                  {
                     "state":"SELL",
                     "effectiveDateTime":"2015-10-09T00:55:23.6345Z",
                     "stockQuantity":{
                        "quantity":1400.0,
                        "stockKeepingLevel":"A"
                     }
                  },
                  {
                     "state":"HELD",
                     "effectiveDateTime":"2015-10-09T00:55:23.6345Z",
                     "stockQuantity":{
                        "quantity":800.0,
                        "stockKeepingLevel":"B"
                     }
                  }
               ] }}"""

val rddJson = sparkContext.parallelize(Seq(json))
var df = sqlContext.read.json(rddJson)
df = df.withColumn("prodID", df("level.productReference.prodID"))
  .withColumn("unitOfMeasure", df("level.productReference.unitOfMeasure"))
  .withColumn("states", explode(df("level.states")))
  .drop("level")
df = df.withColumn("state", df("states.state"))
  .withColumn("effectiveDateTime", df("states.effectiveDateTime"))
  .withColumn("quantity", df("states.stockQuantity.quantity"))
  .withColumn("stockKeepingLevel", df("states.stockQuantity.stockKeepingLevel"))
  .drop("states")
df.show(false)

This will give out put as

+------+-------------+-----+-------------------------+--------+-----------------+
|prodID|unitOfMeasure|state|effectiveDateTime        |quantity|stockKeepingLevel|
+------+-------------+-----+-------------------------+--------+-----------------+
|1234  |EA           |SELL |2015-10-09T00:55:23.6345Z|1400.0  |A                |
|1234  |EA           |HELD |2015-10-09T00:55:23.6345Z|800.0   |B                |
+------+-------------+-----+-------------------------+--------+-----------------+

Now that you have desired output as dataframe converting to rdd is just calling .rdd

df.rdd.foreach(println)

will give output as below

[1234,EA,SELL,2015-10-09T00:55:23.6345Z,1400.0,A]
[1234,EA,HELD,2015-10-09T00:55:23.6345Z,800.0,B]

I hope this is helpful