I am using Spark 2.0.2, with Kafka 0.11.0, and I am trying to consume message from kafka in spark streaming. Following is the code:
val topics = "notes"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:7092",
"schema.registry.url" -> "http://localhost:7070",
"group.id" -> "connect-cluster1",
"value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
"key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
val topicSet: Set[String] = Set(topics)
val stream = KafkaUtils.createDirectStream[String, String](
SparkStream.ssc,
PreferConsistent,
Subscribe[String, String](topicSet, kafkaParams)
)
stream.foreachRDD ( rdd => {
rdd.foreachPartition(iterator => {
while (iterator.hasNext) {
val next = iterator.next()
println(next.value())
}
})
})
If Kafka message contain records, the output would be:
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312886984, "createdby": "karthik", "notes": "testing20"}
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312890472, "createdby": "karthik", "notes": "testing21"}
Thus, received message is Avro decoded as seen from the consumerRecord's value. Now I need those records in a dataframe format, but I do not know how to proceed from here, even with the schema at hand as follows:
val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val m = sr.getLatestSchemaMetadata(topics + "-value")
val schemaId = m.getId
val schemaString = m.getSchema
val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
val parser = new Schema.Parser()
val avroSchema = parser.parse(schemaString)
println(avroSchema)
with the schema printed as follows:
{"type":"record","name":"notes","namespace":"db","fields":[{"name":"id","type":["null","string"],"default":null},{"name":"createdat","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"createdby","type":["null","string"],"default":null},{"name":"notes","type":["null","string"],"default":null}],"connect.name":"db.notes"}
Can anyone help me understand how to get the dataframe from the consumer record's value? I have looked at other questions such as Use schema to convert AVRO messages with Spark to DataFrame, Handling schema changes in running Spark Streaming application, but they are not dealing with the consumerRecord in the firstplace.