I had been doing spark streaming jobs which consumer and produce data through kafka. I used directDstream,so I had to manage offset by myself,we adopted redis to write and read offsets.Now there is one problem,when I launched my client,my client need to get the offset from redis,not offset which exists in kafka itself.how show I write my code?Now I had written my code below:
kafka_stream = KafkaUtils.createDirectStream(
ssc,
topics=[config.CONSUME_TOPIC, ],
kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
"auto.offset.reset": "largest"},
fromOffsets=read_offset_range(config.OFFSET_KEY))
But I think the fromOffsets is the value(from redis) when the spark-streaming client lauched,not during its running.thank you for helpinp.