Use schema to convert ConsumerRecord value to Data

2019-06-23 03:34发布

问题:

I am using Spark 2.0.2, with Kafka 0.11.0, and I am trying to consume message from kafka in spark streaming. Following is the code:

val topics = "notes"
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:7092",
  "schema.registry.url" -> "http://localhost:7070",
  "group.id" -> "connect-cluster1",
  "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
val topicSet: Set[String] = Set(topics)
val stream = KafkaUtils.createDirectStream[String, String](
  SparkStream.ssc,
  PreferConsistent,
  Subscribe[String, String](topicSet, kafkaParams)
)
stream.foreachRDD ( rdd => {
  rdd.foreachPartition(iterator => {
    while (iterator.hasNext) {
      val next = iterator.next()
      println(next.value())
    }
  })
})

If Kafka message contain records, the output would be:

{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312886984, "createdby": "karthik", "notes": "testing20"}
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312890472, "createdby": "karthik", "notes": "testing21"}

Thus, received message is Avro decoded as seen from the consumerRecord's value. Now I need those records in a dataframe format, but I do not know how to proceed from here, even with the schema at hand as follows:

val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val m = sr.getLatestSchemaMetadata(topics + "-value")
val schemaId = m.getId
val schemaString = m.getSchema

val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
val parser = new Schema.Parser()
val avroSchema = parser.parse(schemaString)
println(avroSchema)

with the schema printed as follows:

{"type":"record","name":"notes","namespace":"db","fields":[{"name":"id","type":["null","string"],"default":null},{"name":"createdat","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"createdby","type":["null","string"],"default":null},{"name":"notes","type":["null","string"],"default":null}],"connect.name":"db.notes"}

Can anyone help me understand how to get the dataframe from the consumer record's value? I have looked at other questions such as Use schema to convert AVRO messages with Spark to DataFrame, Handling schema changes in running Spark Streaming application, but they are not dealing with the consumerRecord in the firstplace.

回答1:

You can use below snippet : stream is the DStream of consumer record returned from kafkaUtils api of kafka010 :

stream.foreachRDD(rdd =>
    if (!rdd.isEmpty()) {
        val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
        import sqlContext.implicits._
        val topicValueStrings = rdd.map(record => (record.value()).toString)
        val df = sqlContext.read.json(topicValueStrings)
        df.show()
    })


回答2:

I'm new to scala\kafka\spark myself, so I'm not sure if this exactly answers the question, but it would have helped me out. I'm sure there is a better way than this, so hopefully someone with more experience can come along and provide a better answer.

// KafkaRDD
stream.foreachRDD { rdd => {

  // pull the values I'm looking for into a string array
  var x = rdd.map(row => row.value()).collect()

  // convert to dataframe
  val df = spark.createDataFrame(x).toDF("record")

  // write data frame to datastore (MySQL in my case)
  df.write
    .mode(SaveMode.Append)
    .jdbc(url, table, props)

  }
}