I have dstream of (Key,Value) type.
mapped2.foreachRDD(rdd => {
rdd.foreachPartition(p => {
p.foreach(x => {
}
)})
})
I need to get assured that all items with identical keys are processed in one partition and by one core..so actually there are processed sequentially..
How to do this? Can I use GroupBykey which is inefficient?
You can use PairDStreamFunctions.combineByKey
:
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.dstream.DStream
/**
* Created by Yuval.Itzchakov on 29/11/2016.
*/
object GroupingDStream {
def main(args: Array[String]): Unit = {
val pairs: DStream[(String, String)] = ???
val numberOfPartitions: Int = ???
val groupedByIds: DStream[(String, List[String])] = pairs.combineByKey[List[String]](
_ => List[String](),
(strings: List[String], s: String) => s +: strings,
(first: List[String], second: List[String]) => first ++ second, new HashPartitioner(numberOfPartitions))
groupedByIds.foreachRDD(rdd => {
rdd.foreach((kvp: (String, List[String])) => {
})
})
}
}
The result of combineByKey
would be a tuple with the first element being the key and the second element being a collection of the values. Note I used (String, String)
for the sake of the simplicity of the example, as you haven't provided any types.
Then, using foreach
to iterate the list of values and process them sequentially if you need. Note that if you need to apply additional transformations, you can use DStream.map
and operate on the second element (list of values) instead of using foreachRDD
.