Spark Kafka Streaming CommitAsync Error [duplicate

2019-04-17 10:41发布

This question already has an answer here:

I am new to Scala and RDD concept. Reading message from kafka using Kafka stream api in Spark and trying to commit after business work. but I am getting error.

Note: Using repartition for Parallel work

How to read offset from stream APi and commit it to Kafka ?

scalaVersion := "2.11.8" val sparkVersion = "2.2.0" val connectorVersion = "2.0.7" val kafka_stream_version = "1.6.3"

Code

    val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
    ssc.checkpoint("C:/Gnana/cp")

    val kafkaStream = {

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "ignite3",

        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )

      val topics = Array("test")
      val numPartitionsOfInputTopic = 2
      val streams = (1 to numPartitionsOfInputTopic) map {
        _ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(_.value())
      }

      val unifiedStream = ssc.union(streams)
      val sparkProcessingParallelism = 1
      unifiedStream.repartition(sparkProcessingParallelism)
    }
//Finding offsetRanges
kafkaStream
  .transform {
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
  }
//do business operation and persist offset to kafka
kafkaStream.foreachRDD(rdd=> {
  println("offsetRanges:"+offsetRanges)
  rdd.foreach(conRec=> {
    println(conRec)
    kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  })
})

    println(" Spark parallel reader is ready !!!")

   ssc.start()
    ssc.awaitTermination()
  }

Error

java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.TransformedDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects. at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:525) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:512) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)

1条回答
一纸荒年 Trace。
2楼-- · 2019-04-17 11:26

Dont repartition before calculating offsetRanges.If you do so then will encounter this issue.To test you simply remove repartition and then try running this application.

查看更多
登录 后发表回答