In my Spark application I create a DStream from a Kafka topic in the following way:
KafkaUtils
.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder, (String, Array[Byte])](
streamingContext,
kafkaParams,
offset.get,
{ message: MessageAndMetadata[String, Array[Byte]] => (message.key(), message.message()) }
)
and later, i commit offset to Kafka topic using asInstanceOf function:
directKafkaStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
// offsetRanges.length = # of Kafka partitions being consumed
... }
In this case everything is ok, but if i repartion the dstream, when i try to commit offsets I have the following exception:
java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges repartition
Can someone help me?
Why do you repartition
at all_?!_ I'd say it is not allowed given the number of partitions (in KafkaRDD) is exactly the number of so-called offset ranges (i.e. topic partitions you read records from). You'd then "damage" what Spark has calculated to be the best for parallelism and distribution.
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
}.toArray
}
Moreover, only KafkaRDD
is HasOffsetRanges:
private[spark] class KafkaRDD[K, V](
...
) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges
And the official documentation in Obtaining Offsets says so:
Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the result of createDirectStream, not later down a chain of methods. Be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().
With RDD.repartition
you simply create a CoalescedRDD
(in so-called RDD lineage):
...
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
As the RDD does not have HasOffsetRanges
mixed-in, you get the ClassCastException
.
If you want to increase the parallelism (and have more offset ranges in Spark's KafkaRDD
), increase the number of partitions in the topic(s) and Spark Streaming will handle that nicely for you.
Quoting Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) (highlighting mine):
The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata.