How to implement deserialisation in kafka consumer

2019-04-02 04:02发布

I have the following line in my kafka consumer's code.

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) 

How to deserialize this stream "lines" into original object? Serialisability was implemented in the kafka producer by extending class to serialisable. I am implementing this in spark using scala.

1条回答
一纸荒年 Trace。
2楼-- · 2019-04-02 04:38

You need to implement a custom Decoder and provide the expected type information together with the decoder to the createStream function.

KafkaUtils.createStream[KeyType, ValueType, KeyDecoder, ValueDecoder] (...)

For example, if you are using String as key and CustomContainer as value, your stream creation will look like this:

val stream = KafkaUtils.createStream[String, CustomContainer, StringDecoder, CustomContainerDecoder](...)  

Given that you are enconding the messages to kafka as new KeyedMessage[String,String], the right decoder is a string decoder like this:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](topic,...)

that will give you a DStream[String,String] as basis for your processing.

If you want to send/receive a specific object type you need to implement a Kafka Encoder and Decoder for it. Luckily for you, PcapPacket already implements the methods that you require to do that:

The rest is boilerplate code to implement the Encoder/Decoder interfaces required by Kafka.

查看更多
登录 后发表回答