Spark - Get earliest and latest offset of Kafka wi

2019-06-28 05:45发布

I am currently using spark-streaming-kafka-0-10_2.11 to connect my spark application with the kafka queue. For Streams everything works fine. For a specific scenario however I just need the whole content of the kafka queue exactly once - for this I got the suggestion to better use KafkaUtils.createRDD (SparkStreaming: Read Kafka Stream and provide it as RDD for further processing)

However for spark-streaming-kafka-0-10_2.11 I cannot figure out how to get the earliest and latest offset for my Kafka topic that would be needed to create the Offset-Range I have to hand of the the createRDD method.

What is the recommended way to get those offsets without opening a stream? Any help would be greatly appreciated.

1条回答
\"骚年 ilove
2楼-- · 2019-06-28 06:20

After reading several discussions I am able to get the earliest or latest offset from a specific partition with :

val consumer = new SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");
val topicAndPartition = new TopicAndPartition(topic, initialPartition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

return offsets.head

but still , how to replicate the behaviour of "from_beginning" in a kafka_consumer.sh CLI command is something I do not know by the KafkaUtils.createRDD aproach.

查看更多
登录 后发表回答