Spark Kafka Streaming CommitAsync Error [duplicate

2019-04-17 11:07发布

问题:

This question already has an answer here:

  • Exception while accessing KafkaOffset from RDD 1 answer

I am new to Scala and RDD concept. Reading message from kafka using Kafka stream api in Spark and trying to commit after business work. but I am getting error.

Note: Using repartition for Parallel work

How to read offset from stream APi and commit it to Kafka ?

scalaVersion := "2.11.8" val sparkVersion = "2.2.0" val connectorVersion = "2.0.7" val kafka_stream_version = "1.6.3"

Code

    val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
    ssc.checkpoint("C:/Gnana/cp")

    val kafkaStream = {

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "ignite3",

        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )

      val topics = Array("test")
      val numPartitionsOfInputTopic = 2
      val streams = (1 to numPartitionsOfInputTopic) map {
        _ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(_.value())
      }

      val unifiedStream = ssc.union(streams)
      val sparkProcessingParallelism = 1
      unifiedStream.repartition(sparkProcessingParallelism)
    }
//Finding offsetRanges
kafkaStream
  .transform {
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
  }
//do business operation and persist offset to kafka
kafkaStream.foreachRDD(rdd=> {
  println("offsetRanges:"+offsetRanges)
  rdd.foreach(conRec=> {
    println(conRec)
    kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  })
})

    println(" Spark parallel reader is ready !!!")

   ssc.start()
    ssc.awaitTermination()
  }

Error

java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.TransformedDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects. at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:525) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:512) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)

回答1:

Dont repartition before calculating offsetRanges.If you do so then will encounter this issue.To test you simply remove repartition and then try running this application.