I have created a kafka stream in a python spark app and can parse any text that comes through it.
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
I want to change this to be able to parse avro messages from a kafka topic. When parsing avro messages from a file, I do it like:
reader = DataFileReader(open("customer.avro", "r"), DatumReader())
I'm new to python and spark, how do I change the stream to be able to parse the avro message? Also how can I specify a schema to use when reading the Avro message from Kafka??? I've done all this in java before but python is confusing me.
Edit:
I tried changing to include the avro decoder
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))
but I get the following error
TypeError: 'DatumReader' object is not callable
As mentioned by @Zoltan Fedor in the comment, the provided answer is a bit old now, as 2.5 years had passed since it was written. The confluent-kafka-python library has evolved to support the same functionality nativly. The only chnage needed in the given code is following.
And then, you can change this line -
I had tested it and it works nicely. I am adding this answer for anyone who may need it in future.
I had the same challenge - deserializing avro messages from Kafka in pyspark and solved it with the Confluent Schema Registry module's Messageserializer method, as in our case the schema is stored in a Confluent Schema Registry.
You can find that module at https://github.com/verisign/python-confluent-schemaregistry
Obviously as you can see this code is using the new, direct approach with no receivers, hence the createdDirectStream (see more at https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)