Accessing Collection of DStreams

2019-03-01 17:32发布

问题:

I am trying to access a collection of filtered DStreams obtained like in the solution to this question: Spark Streaming - Best way to Split Input Stream based on filter Param

I create the Collection as follows:

val statuCodes = Set("200","500", "404")
    spanTagStream.cache()
    val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))

I try to access statusCodeStreams in the following way:

for(streamTuple <- statusCodeStreams){
      streamTuple._2.foreachRDD(rdd =>
  rdd.foreachPartition(
      partitionOfRecords =>
        {
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            val producer = new KafkaProducer[String,String](props)

            partitionOfRecords.foreach
            {
                 x=>{ 
                 /* Code Writing to Kafka using streamTuple._1 as the topic-String */
                 }
            }
      })
   )
}

When executing this I receive the following error: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects

How do I access the Streams to write to Kafka in a serializable way?

回答1:

As the exception indicates, the DStream definition is being captured by the closure. A simple option is to declare this DStream transient:

@transient val spamTagStream = //KafkaUtils.create...

@transient flags certain objects to be removed from the Java serialization of the object graph of some object. The key of this scenario is that some val declared in the same scope as the DStream (statusCodeStreams in this case) is used within the closure. The actual reference of that val from within the closure is outer.statusCodeStreams, causing that the serialization process to "pull" all context of outer into the closure. With @transient we mark the DStream (and also the StreamingContext) declarations as non-serializable and we avoid the serialization issue. Depending on the code structure (if it's all linear in one main function (bad practice, btw) it might be necessary to mark ALL DStream declarations + the StreamingContext instance as @transient.

If the only intent of the initial filtering is to 'route' the content to separate Kafka topics, it might be worth moving the filtering within the foreachRDD. That would make for a simpler program structure.

spamTagStream.foreachRDD{ rdd => 
    rdd.cache()
    statuCodes.map{code =>
        val matchingCodes = rdd.filter(...)
        matchingCodes.foreachPartition{write to kafka}
    }
    rdd.unpersist(true)
}