I am trying to access a collection of filtered DStreams obtained like in the solution to this question: Spark Streaming - Best way to Split Input Stream based on filter Param
I create the Collection as follows:
val statuCodes = Set("200","500", "404")
spanTagStream.cache()
val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))
I try to access statusCodeStreams
in the following way:
for(streamTuple <- statusCodeStreams){
streamTuple._2.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
{
x=>{
/* Code Writing to Kafka using streamTuple._1 as the topic-String */
}
}
})
)
}
When executing this I receive the following error:
java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects
How do I access the Streams to write to Kafka in a serializable way?