I am trying to access a collection of filtered DStreams obtained like in the solution to this question: Spark Streaming - Best way to Split Input Stream based on filter Param
I create the Collection as follows:
val statuCodes = Set("200","500", "404")
spanTagStream.cache()
val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))
I try to access statusCodeStreams
in the following way:
for(streamTuple <- statusCodeStreams){
streamTuple._2.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
{
x=>{
/* Code Writing to Kafka using streamTuple._1 as the topic-String */
}
}
})
)
}
When executing this I receive the following error:
java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects
How do I access the Streams to write to Kafka in a serializable way?
As the exception indicates, the
DStream
definition is being captured by the closure. A simple option is to declare thisDStream
transient:@transient
flags certain objects to be removed from the Java serialization of the object graph of some object. The key of this scenario is that someval
declared in the same scope as theDStream
(statusCodeStreams
in this case) is used within the closure. The actual reference of thatval
from within the closure isouter.statusCodeStreams
, causing that the serialization process to "pull" all context ofouter
into the closure. With@transient
we mark the DStream (and also the StreamingContext) declarations as non-serializable and we avoid the serialization issue. Depending on the code structure (if it's all linear in onemain
function (bad practice, btw) it might be necessary to mark ALL DStream declarations + the StreamingContext instance as@transient
.If the only intent of the initial filtering is to 'route' the content to separate Kafka topics, it might be worth moving the filtering within the
foreachRDD
. That would make for a simpler program structure.