I'm trying to solve a problem such that I've got a dataset like this:
(1, 3)
(1, 4)
(1, 7)
(1, 2) <-
(2, 7) <-
(6, 6)
(3, 7) <-
(7, 4) <-
...
Since (1 -> 2)
and (2 -> 7)
, I would like to replace the set (2, 7)
as (1, 7)
similarly, (3 -> 7)
and (7 -> 4)
also replace (7,4)
as (3, 4)
Hence, my dataset becomes
(1, 3)
(1, 4)
(1, 7)
(1, 2)
(1, 7)
(6, 6)
(3, 7)
(3, 4)
...
Any idea how to solve or tackle this ?
Thanks
This problem looks like a transitive closure of a graph, represented in the form of a distributed list of edges.
One of the key features of Spark, when compared to older Hadoop MR is that Spark supports interactive algorithms. To solve a graph traversal problem like this, we exploit that capability in a recursive function:
def closure(rdd:RDD[(Int, Int)]):RDD[(Int,Int)] = {
val transitiveValues = rdd.map(_.swap).join(rdd).filter{case (_,(x,y)) => x != y}
if (transitiveValues.isEmpty) {
rdd
} else {
val usedTransitions = transitiveValues.flatMap{case (a,(x,y)) => Seq((x,a),(a,y))}
val newTransitions = transitiveValues.map{case (a,(x,y)) => (x,y)}
closure(rdd.subtract(usedTransitions).union(newTransitions)).distinct
}
}
This does not exactly results in the output expected above, because there's no notion of precedence (implicit ordering), so closure((1, 2),(2, 7)) = (1,7)
and not in (1, 2), (1, 7)
as expected above. Ordering can be added at the cost of extra complexity. Also, it does not support cyclic graphs (with loops).
This algorithm should serve only as starting point to be tuned to the specific internal requirements.