Is there a way to use a schema to convert avro messages from kafka with spark to dataframe? The schema file for user records:
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
And code snippets from SqlNetworkWordCount example and Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages to read in messages.
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
case class User(firstName: String, lastName: String)
Somehow I can't find another way than using a case class to convert AVRO messages to DataFrame. Is there a possibility to use the schema instead? I'm using Spark 1.6.2
and Kafka 0.10
.
The complete code, in case you're interested.
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
object ReadMessagesFromKafka {
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]) {
val brokers = "127.0.0.1:9092"
val topics = "test"
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet)
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
For anyone interested in handling this in a way that can handle schema changes without needing to stop and redeploy your spark application (assuming your app logic can handle this) see this question/answer.
Please take a look at this https://github.com/databricks/spark-avro/blob/master/src/test/scala/com/databricks/spark/avro/AvroSuite.scala
So instead of
you can try this
I worked on the similar issue, but in Java. So not sure about Scala, but take a look at the library com.databricks.spark.avro.
OP probably resolved the issue but for future reference I solved this issue quite generally so thought it might be helpful to post here.
So generally speaking you should convert the Avro schema to a spark StructType and also convert the object you have in your RDD to Row[Any] and then use:
In order to convert the Avro schema I used spark-avro like so:
The convertion of the RDD was more tricky.. if your schema is simple you can probably just do a simple map.. something like this:
In this example the object has 2 fields name and age.
The important thing is to make sure the elements in the Row will match the order and types of the fields in the StructType from before.
In my perticular case I had a much more complex object which I wanted to handle generically to support future schema changes so my code was much more complex.
the method suggested by OP should also work on some casese but will be hard to imply on complex objects (not primitive or case-class)
another tip is that if you have a class within a class you should convert that class to a Row so that the wrapping class will be converted to something like:
you can also look at the spark-avro project I mentioned earlier on how to convert objects to rows.. I used some of the logic there myself
If someone reading this needs further help ask me in the comments and I'll try to help
Similar problem is solved also here.