In my Spark application I create a DStream from a Kafka topic in the following way:
KafkaUtils
.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder, (String, Array[Byte])](
streamingContext,
kafkaParams,
offset.get,
{ message: MessageAndMetadata[String, Array[Byte]] => (message.key(), message.message()) }
)
and later, i commit offset to Kafka topic using asInstanceOf function:
directKafkaStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
// offsetRanges.length = # of Kafka partitions being consumed
... }
In this case everything is ok, but if i repartion the dstream, when i try to commit offsets I have the following exception:
java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges repartition
Can someone help me?
Why do you
repartition
at all_?!_ I'd say it is not allowed given the number of partitions (in KafkaRDD) is exactly the number of so-called offset ranges (i.e. topic partitions you read records from). You'd then "damage" what Spark has calculated to be the best for parallelism and distribution.Moreover, only
KafkaRDD
is HasOffsetRanges:And the official documentation in Obtaining Offsets says so:
With
RDD.repartition
you simply create aCoalescedRDD
(in so-called RDD lineage):As the RDD does not have
HasOffsetRanges
mixed-in, you get theClassCastException
.If you want to increase the parallelism (and have more offset ranges in Spark's
KafkaRDD
), increase the number of partitions in the topic(s) and Spark Streaming will handle that nicely for you.Quoting Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) (highlighting mine):