This question already has an answer here:
- Exception while accessing KafkaOffset from RDD 1 answer
I am new to Scala and RDD concept. Reading message from kafka using Kafka stream api in Spark and trying to commit after business work. but I am getting error.
Note: Using repartition for Parallel work
How to read offset from stream APi and commit it to Kafka ?
scalaVersion := "2.11.8" val sparkVersion = "2.2.0" val connectorVersion = "2.0.7" val kafka_stream_version = "1.6.3"
Code
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
ssc.checkpoint("C:/Gnana/cp")
val kafkaStream = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "ignite3",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val numPartitionsOfInputTopic = 2
val streams = (1 to numPartitionsOfInputTopic) map {
_ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(_.value())
}
val unifiedStream = ssc.union(streams)
val sparkProcessingParallelism = 1
unifiedStream.repartition(sparkProcessingParallelism)
}
//Finding offsetRanges
kafkaStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
//do business operation and persist offset to kafka
kafkaStream.foreachRDD(rdd=> {
println("offsetRanges:"+offsetRanges)
rdd.foreach(conRec=> {
println(conRec)
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
})
println(" Spark parallel reader is ready !!!")
ssc.start()
ssc.awaitTermination()
}
Error
java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.TransformedDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects. at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:525) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:512) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)