Kafka Consumer for Spark written in Scala for Kafk

2019-05-17 19:58发布

I am upgrading my Spark Scala App Kafka API to v. 0.10. I used to create custom method for deserialization of the message which comes in byte string format.

I have realized there is a way to pass StringDeserializer or ByteArrayDeserializer as parameter to either key or value.

However,I can not find any information on how to create custom Avro schema deserializer so my kafkaStream can use it when I createDirectStream and consume data from Kafka.

is it possible?

1条回答
Bombasti
2楼-- · 2019-05-17 20:49

It is possible. You need to override the Deserializer<T> interface defined in org.apache.kafka.common.serialization and you need to point key.deserializer or value.deserializer to your custom class via the ConsumerStrategy[K, V] class which holds the Kafka parameters. For example:

import org.apache.kafka.common.serialization.Deserializer

class AvroDeserializer extends Deserializer[Array[Byte]] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
  override def close(): Unit = ???
  override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
}

And then:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import my.location.with.AvroDeserializer

val ssc: StreamingContext = ???
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[AvroDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("sometopic")
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
查看更多
登录 后发表回答