I have nested JSON and like to have output in tabular structure. I am able to parse the JSON values individually , but having some problems in tabularizing it. I am able to do it via dataframe easily. But I want do it using "RDD ONLY " functions. Any help much appreciated.
Input JSON:
{ "level":{"productReference":{
"prodID":"1234",
"unitOfMeasure":"EA"
},
"states":[
{
"state":"SELL",
"effectiveDateTime":"2015-10-09T00:55:23.6345Z",
"stockQuantity":{
"quantity":1400.0,
"stockKeepingLevel":"A"
}
},
{
"state":"HELD",
"effectiveDateTime":"2015-10-09T00:55:23.6345Z",
"stockQuantity":{
"quantity":800.0,
"stockKeepingLevel":"B"
}
}
] }}
Expected Output:
I tried Below Spark code . But getting output like this and Row() object is not able to parse this.
079562193,EA,List(SELLABLE, HELD),List(2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z),List(1400.0, 800.0),List(SINGLE, SINGLE)
def main(Args : Array[String]): Unit = {
val conf = new SparkConf().setAppName("JSON Read and Write using Spark RDD").setMaster("local[1]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val salesSchema = StructType(Array(
StructField("prodID", StringType, true),
StructField("unitOfMeasure", StringType, true),
StructField("state", StringType, true),
StructField("effectiveDateTime", StringType, true),
StructField("quantity", StringType, true),
StructField("stockKeepingLevel", StringType, true)
))
val ReadAlljsonMessageInFile_RDD = sc.textFile("product_rdd.json")
val x = ReadAlljsonMessageInFile_RDD.map(eachJsonMessages => {
parse(eachJsonMessages)
}).map(insideEachJson=>{
implicit val formats = org.json4s.DefaultFormats
val prodID = (insideEachJson\ "level" \"productReference" \"TPNB").extract[String].toString
val unitOfMeasure = (insideEachJson\ "level" \ "productReference" \"unitOfMeasure").extract[String].toString
val state= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"state").extract[String]).toString()
val effectiveDateTime= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"effectiveDateTime").extract[String]).toString
val quantity= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"quantity").extract[Double]).
toString
val stockKeepingLevel= (insideEachJson \ "level" \"states").extract[List[JValue]].
map(x=>(x\"stockQuantity").extract[JValue]).map(x=>(x\"stockKeepingLevel").extract[String]).
toString
//Row(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)
println(prodID,unitOfMeasure,state,effectiveDateTime,quantity,stockKeepingLevel)
}).collect()
// sqlContext.createDataFrame(x,salesSchema).show(truncate = false)
}
DataFrame
andDataSet
are much moreoptimized
thanrdd
and there are a lot ofoptions
to try with to reach to the solution we desire.In my opinion,
DataFrame
is developed to make the developers comfortable viewing data in tabular form so that logics can be implemented with ease. So I always suggest users to usedataframe
ordataset
.Talking much less, I am posting you the solution below using
dataframe
. Once you have adataframe
, switching tordd
is very easy.Your desired solution is below (you will have to find a way to read
json
file as its done withjson string
below : thats an assignment for you :) good luck)This will give out put as
Now that you have desired output as
dataframe
converting tordd
is just calling.rdd
will give output as below
I hope this is helpful
HI below is the "DATAFRAME" ONLY Solution which I developed. Looking for complete "RDD ONLY" solution
There are 2 versions of solutions to your question.
Version 1:
This would give you following output:
Version 2:
This would give you following output:
The difference between Version 1 & 2 is of schema. In Version 1 you are casting every column into
String
whereas in Version 2 they are being casted intoArray
.