We have a DStream, such as
val ssc = new StreamingContext(sc, Seconds(1))
val kS = KafkaUtils.createDirectStream[String, TMapRecord](
ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)).
mapPartitions(part => {
part.map(_.value())
}).
mapPartitions(part1 => {
part1.map(c => {
TMsg(1,
c.field1,
c.field2, //And others
c.startTimeSeconds
)
})
})
So each RDD has a bunch of TMsg
objects with some of the (technical) key fields I can use to dediplicate DStream. Basically, if we have two TMsg objects IN ONE OR TWO DISCRETIZED RDDs with same field1
and field2
, and they differ by less than 1 second (we look on startTimeSeconds
), it's a duplicate.
I looked over mapWithState.
Yes I can create K -> V DStream like
val mappedStream = kS.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
So I can use the function but don't understand how I can use it to filter duplicates.
Window function can't help, and I can't use (structured stream).deduplicate function since solution is written in DStreams.
Any solutions? Thanks
P.S. Spark version is 2.2
You could use mapWithState
. There is a good manual how to use Stateful Streaming.
In your case you could:
1.Set checkpoin:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("path/to/persistent/storage")
2.Define update function:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
# you can update your state in any value you want
# it is just a marker that value not new
state.update(value.get)
Option((key, v))
case (_, _) if state.isTimingOut() => None
}
}
3.Make state spec:
val stateSpec =
StateSpec
.function(update _)
# it is important to define how long
# you want to check duplication
# in this example check interval is 1 second.
.timeout(Seconds(1))
4.Use it:
ks
# make key->value pairs
.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
.mapWithState(stateSpec)
if you want to take last of values instead, update function may be:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
state.update(value.get)
None
case (_, _) if state.isTimingOut() => Option((key, value.get))
}
}