How to read data in a kafka topic to an RDD by specifying start and end offsets?
KafkaUtils.createRDD
is was experimental and the API is rather unpleasant (it returns a big bloated Java ConsumerRecord
class that is not even serialisable and puts it in KafkaRDD
, which overrides a lot of methods (like persist) to just throw an exception.
What I would like, is a straightforward API like this:
case class Message(key: String,
value: String,
offset: Long,
timestamp: Long)
def readKafka(topic: String, offsetsByPartition: Map[Int, (Long, Long)])
(config: KafkaConfig, sc: SparkContext): RDD[Message]
Or something similar where key: Array[Byte]
and value: Array[Byte]