I have the following line in my kafka consumer's code.
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
How to deserialize this stream "lines" into original object? Serialisability was implemented in the kafka producer by extending class to serialisable. I am implementing this in spark using scala.
You need to implement a custom Decoder and provide the expected type information together with the decoder to the createStream function.
KafkaUtils.createStream[KeyType, ValueType, KeyDecoder, ValueDecoder] (...)
For example, if you are using
String
as key andCustomContainer
as value, your stream creation will look like this:Given that you are enconding the messages to kafka as
new KeyedMessage[String,String]
, the right decoder is a string decoder like this:that will give you a
DStream[String,String]
as basis for your processing.If you want to send/receive a specific object type you need to implement a Kafka Encoder and Decoder for it. Luckily for you,
PcapPacket
already implements the methods that you require to do that:PcapPacket -> byte[]: public int transferStateAndDataTo(byte[] buffer)
byte[] -> PcapPacket: public PcapPacket(byte[] buffer)
The rest is boilerplate code to implement the Encoder/Decoder interfaces required by Kafka.