Why does Spark Streaming fail with ClassCastExcept

2019-07-24 23:30发布

In my Spark application I create a DStream from a Kafka topic in the following way:

 KafkaUtils
  .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder, (String, Array[Byte])](
    streamingContext,
    kafkaParams,
    offset.get,
    { message: MessageAndMetadata[String, Array[Byte]] => (message.key(), message.message()) }
  )

and later, i commit offset to Kafka topic using asInstanceOf function:

directKafkaStream.foreachRDD { rdd => 
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
 // offsetRanges.length = # of Kafka partitions being consumed
 ... }

In this case everything is ok, but if i repartion the dstream, when i try to commit offsets I have the following exception:

java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges repartition

Can someone help me?

1条回答
Explosion°爆炸
2楼-- · 2019-07-25 00:19

Why do you repartition at all_?!_ I'd say it is not allowed given the number of partitions (in KafkaRDD) is exactly the number of so-called offset ranges (i.e. topic partitions you read records from). You'd then "damage" what Spark has calculated to be the best for parallelism and distribution.

  override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
    }.toArray
  }

Moreover, only KafkaRDD is HasOffsetRanges:

private[spark] class KafkaRDD[K, V](
  ...
  ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges

And the official documentation in Obtaining Offsets says so:

Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the result of createDirectStream, not later down a chain of methods. Be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

With RDD.repartition you simply create a CoalescedRDD (in so-called RDD lineage):

...
  new CoalescedRDD(
    new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
    new HashPartitioner(numPartitions)),
    numPartitions,
    partitionCoalescer).values
} else {
  new CoalescedRDD(this, numPartitions, partitionCoalescer)
}

As the RDD does not have HasOffsetRanges mixed-in, you get the ClassCastException.

If you want to increase the parallelism (and have more offset ranges in Spark's KafkaRDD), increase the number of partitions in the topic(s) and Spark Streaming will handle that nicely for you.

Quoting Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) (highlighting mine):

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata.

查看更多
登录 后发表回答