Simple Spark Structured Streaming equivalent of Ka

2019-08-16 06:23发布

问题:

How to read data in a kafka topic to an RDD by specifying start and end offsets?

KafkaUtils.createRDD is was experimental and the API is rather unpleasant (it returns a big bloated Java ConsumerRecord class that is not even serialisable and puts it in KafkaRDD, which overrides a lot of methods (like persist) to just throw an exception.

What I would like, is a straightforward API like this:

case class Message(key: String, 
                   value: String, 
                   offset: Long, 
                   timestamp: Long)

def readKafka(topic: String, offsetsByPartition: Map[Int, (Long, Long)])
             (config: KafkaConfig, sc: SparkContext): RDD[Message]

Or something similar where key: Array[Byte] and value: Array[Byte]

回答1:

To read from kafka with offsets, code would look like, as referenced here

val df = 
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()

The above will read the data available within the offsets, and then you can convert the columns to string, and cast to your object Message.

val messageRDD: RDD[Message] = 
  df.select(
    col("key").cast("string"), 
    col("value").cast("string"), 
    col("offset").cast("long"),
    col("timestamp").cast("long")
  ).as[Message].rdd