Not able to persist the DStream for use in next ba

2019-02-20 04:58发布

问题:

JavaRDD<String> history_ = sc.emptyRDD();

java.util.Queue<JavaRDD<String> > queue = new LinkedList<JavaRDD<String>>();
queue.add(history_);
JavaDStream<String> history_dstream = ssc.queueStream(queue);

JavaPairDStream<String,ArrayList<String>> history = history_dstream.mapToPair(r -> {
  return new Tuple2< String,ArrayList<String> >(null,null);
});  

 JavaPairInputDStream<String, GenericData.Record> stream_1 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_1);


JavaPairInputDStream<String, GenericData.Record> stream_2 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_2);

then doing some transformation and creating twp DStream Data_1 and Data_2 of type

JavaPairDStream<String, <ArrayList<String>>

and do the join as below , then filtering out those records for whom there was no joining key and saving them in history for using it in next batch by doing its union with Data_1

 Data_1 = Data_1.union(history);

JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> joined =
    Data_1.leftOuterJoin(Data_2).cache();


JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> notNULL_join = joined.filter(r -> r._2._2().isPresent());
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> dstream_filtered = joined.filter(r -> !r._2._2().isPresent());

history = dstream_filtered.mapToPair(r -> {
  return new Tuple2<>(r._1,r._2._1);
}).persist;

I get history after the previous step(checked by saving it to hdfs) , but still this history is empty in batch while doing union.

回答1:

It's conceptually not possible to "remember" a DStream. DStreams are time-bound and on each clock-tick (called "batch interval") the DStream represents the observed data in the stream during that period of time.

Hence, we cannot have an "old" DStream saved to join with a "new" DStream. All DStreams live in the "now".

The underlying data structure of DStreams is the RDD: Each batch interval, our DStream will have 1 RDD of the data for that interval. RDDs represent a distributed collection of data. RDDs are immutable and permanent, for as long as we have a reference to them.

We can combine RDDs and DStreams to create the "history roll over" that's required here.

It looks pretty similar to the approach on the question, but only using the history RDD.

Here's a high-level view of the suggested changes:

var history: RDD[(String, List[String]) = sc.emptyRDD()

val dstream1 = ...
val dstream2 = ...

val historyDStream = dstream1.transform(rdd => rdd.union(history))
val joined = historyDStream.join(dstream2)

... do stuff with joined as above, obtain dstreamFiltered ...

dstreamFiltered.foreachRDD{rdd =>
   val formatted = rdd.map{case (k,(v1,v2)) => (k,v1)} // get rid of the join info
   history.unpersist(false) // unpersist the 'old' history RDD
   history = formatted // assign the new history
   history.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
   history.count() //action to materialize this transformation
}

This is only a starting point. There're additional considerations with regards to checkpointing. Otherwise the lineage of the history RDD will grow unbounded until some StackOverflow happens. This blog is quite complete on this particular technique: http://www.spark.tc/stateful-spark-streaming-using-transform/

I also recommend you using Scala instead of Java. The Java syntax is too verbose to use with Spark Streaming.