currently I'm working with Kafka / Zookeeper and pySpark (1.6.0).
I have successfully created a kafka consumer, which is using the KafkaUtils.createDirectStream()
.
There is no problem with all the streaming, but I recognized, that my Kafka Topics are not updated to the current offset, after I have consumed some messages.
Since we need the topics updated to have a monitoring here in place this is somehow weird.
In the documentation of Spark I found this comment:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
Here is the documentation: http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
I found a solution in Scala, but I can't find an equivalent for python. Here is the Scala example: http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
Question
But the question is, how I'm able to update the zookeeper from that point on?
I write some functions to save and read Kafka offsets with python kazoo library.
First function to get singleton of Kazoo Client:
Then functions to read and write offsets:
Then before starting streaming you could read offsets from zookeeper and pass them to createDirectStream for
fromOffsets
argument.:I encountered similar question. You are right, by using directStream, means using kafka low-level API directly, which didn't update reader offset. there are couple of examples for scala/java around, but not for python. but it's easy to do it by yourself, what you need to do are:
for example, I save the offset for each partition in redis by doing:
then at the begin, you can use:
for some tools that use zk to track offset, it's better to save the offset in zookeeper. this page: https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html describe how to set the offset, basically, the zk node is: /consumers/[consumer_name]/offsets/[topic name]/[partition id] as we are using directStream, so you have to make up a consumer name.