JavaRDD<String> history_ = sc.emptyRDD();
java.util.Queue<JavaRDD<String> > queue = new LinkedList<JavaRDD<String>>();
queue.add(history_);
JavaDStream<String> history_dstream = ssc.queueStream(queue);
JavaPairDStream<String,ArrayList<String>> history = history_dstream.mapToPair(r -> {
return new Tuple2< String,ArrayList<String> >(null,null);
});
JavaPairInputDStream<String, GenericData.Record> stream_1 =
KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
GenericDataRecordDecoder.class, props, topicsSet_1);
JavaPairInputDStream<String, GenericData.Record> stream_2 =
KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
GenericDataRecordDecoder.class, props, topicsSet_2);
then doing some transformation and creating twp DStream Data_1 and Data_2 of type
JavaPairDStream<String, <ArrayList<String>>
and do the join as below , then filtering out those records for whom there was no joining key and saving them in history for using it in next batch by doing its union with Data_1
Data_1 = Data_1.union(history);
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> joined =
Data_1.leftOuterJoin(Data_2).cache();
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> notNULL_join = joined.filter(r -> r._2._2().isPresent());
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> dstream_filtered = joined.filter(r -> !r._2._2().isPresent());
history = dstream_filtered.mapToPair(r -> {
return new Tuple2<>(r._1,r._2._1);
}).persist;
I get history after the previous step(checked by saving it to hdfs) , but still this history is empty in batch while doing union.
It's conceptually not possible to "remember" a
DStream
.DStreams
are time-bound and on each clock-tick (called "batch interval") theDStream
represents the observed data in the stream during that period of time.Hence, we cannot have an "old"
DStream
saved to join with a "new"DStream
. AllDStreams
live in the "now".The underlying data structure of
DStreams
is theRDD
: Each batch interval, ourDStream
will have 1RDD
of the data for that interval.RDD
s represent a distributed collection of data.RDD
s are immutable and permanent, for as long as we have a reference to them.We can combine
RDD
s andDStream
s to create the "history roll over" that's required here.It looks pretty similar to the approach on the question, but only using the
history
RDD
.Here's a high-level view of the suggested changes:
This is only a starting point. There're additional considerations with regards to
checkpoint
ing. Otherwise the lineage of thehistory
RDD will grow unbounded until some StackOverflow happens. This blog is quite complete on this particular technique: http://www.spark.tc/stateful-spark-streaming-using-transform/I also recommend you using Scala instead of Java. The Java syntax is too verbose to use with Spark Streaming.