Spark JSON text field to RDD

2019-03-13 10:48发布

I've got a cassandra table with a field of type text named snapshot containing JSON objects:

[identifier, timestamp, snapshot]

I understood that to be able to do transformations on that field with Spark, I need to convert that field of that RDD to another RDD to make transformations on the JSON schema.

Is that correct? How should I proceed to to that?

Edit: For now I managed to create an RDD from a single text field:

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val first = snapshots.first()
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3)))
firstJson.printSchema()

Which shows me the JSON schema. Good!

How do I proceed to tell Spark that this schema should be applied on all rows of the table Snapshots, to get an RDD on that snapshot field from each row?

1条回答
唯我独甜
2楼-- · 2019-03-13 10:55

Almost there, you just want to pass your an RDD[String] with your json into the jsonRDD method

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) 
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect 

A quick example

val stringRDD = sc.parallelize(Seq(""" 
  { "isActive": false,
    "balance": "$1,431.73",
    "picture": "http://placehold.it/32x32",
    "age": 35,
    "eyeColor": "blue"
  }""",
   """{
    "isActive": true,
    "balance": "$2,515.60",
    "picture": "http://placehold.it/32x32",
    "age": 34,
    "eyeColor": "blue"
  }""", 
  """{
    "isActive": false,
    "balance": "$3,765.29",
    "picture": "http://placehold.it/32x32",
    "age": 26,
    "eyeColor": "blue"
  }""")
)
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson")
csc.sql("SELECT age from testjson").collect
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26])
查看更多
登录 后发表回答