I can't do transformation on a RDD[ConsumerRec

2020-04-20 08:03发布

问题:

I have a serialization problem with a ConsumerRecord recover from a kafka topic in a DStream. To illustrate my problem, I create the following exemple. I create a ConsumerRecord with a json value, I put it inside a rdd. I do a collect in order to print result. I get a serialization error. here is the Exemple:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
object Exemple {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    val conf = new SparkConf().setMaster("local[*]").setAppName("Exemple")
    val sc = new SparkContext(conf)

    val consumerRecord:  ConsumerRecord[String, String] = new  ConsumerRecord("",1,2l,"myTopic","""{"id":1}""")
    val rdd = sc.parallelize(List(consumerRecord))
    val valueRDD = rdd.map(cr => cr.value)
    valueRDD.collect.foreach(println) // this line throws error

  }
}

I get the following error:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
[Stage 0:>                                                          (0 + 0) / 4]18/03/02 00:58:55 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:472)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:453)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:453)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:295)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:290)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:375)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:373)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:373)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:370)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:370)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/03/02 00:58:55 ERROR Utils: Exception encountered
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply(ParallelCollectionRDD.scala:51)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:189)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
    at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:472)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:453)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:453)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:295)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:290)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:375)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:373)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:373)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:370)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:370)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/03/02 00:58:55 ERROR TaskSetManager: Failed to serialize task 3, not attempting to retry it.
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = , partition = 1, offset = 2, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = myTopic, value = {"id":1}))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1)
    - field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
    - object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(ConsumerRecord(topic = , partition = 1, offset = 2, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = myTopic, value = {"id":1})))
    - writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
    - object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@694)
    - field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
    - object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 3))
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:472)
    at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:453)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:453)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:295)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:290)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:375)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:373)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:373)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:370)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:370)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:85)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/03/02 00:58:55 ERROR TaskSchedulerImpl: Resource offer failed, task set TaskSet_0.0 was not serializable
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 3, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = , partition = 1, offset = 2, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = myTopic, value = {"id":1}))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1)
    - field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
    - object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(ConsumerRecord(topic = , partition = 1, offset = 2, NoTimestampType = -1, checksum = -1, serialized key size = -1, serialized value size = -1, key = myTopic, value = {"id":1})))
    - writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
    - object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@694)
    - field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
    - object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 3))
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at Exemple$.main(Exemple.scala:20)
    at Exemple.main(Exemple.scala)

Process finished with exit code 1

Do you have any idea?

回答1:

org.apache.kafka.clients.consumer.ConsumerRecord is not serializable, So there are some way to solve this kind of problems

  1. Serializable the class using kryo serialization
  2. Declare the instance only within the lambda function passed in map.
  3. Make the NotSerializable object as a static and create it once per machine.
  4. Call rdd.forEachPartition and create the NotSerializable object in there like this:

    rdd.forEachPartition(iter -> { NotSerializable notSerializable = new NotSerializable();

    // ...Now process iter });

For more details you can check this link https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html