Spark complex grouping

2019-08-18 23:35发布

问题:

I have this data structure in Spark:

val df = Seq(
("Package 1", Seq("address1", "address2", "address3")),
("Package 2", Seq("address3", "address4", "address5", "address6")),
("Package 3", Seq("address7", "address8")),
("Package 4", Seq("address9")),
("Package 5", Seq("address9", "address1")),
("Package 6", Seq("address10")),
("Package 7", Seq("address8"))).toDF("Package", "Destinations")
df.show(20, false)

I need to find all the addresses that were seen together across different packages. Looks like I can't find a way to efficiently do that. I've tried to group, map, etc. Ideally, result of the given df would be

+----+------------------------------------------------------------------------+
| Id |                               Addresses                                |
+----+------------------------------------------------------------------------+
|  1 | [address1, address2, address3, address4, address5, address6, address9] |
|  2 | [address7, address8]                                                   |
|  3 | [address10]                                                            |
+----+------------------------------------------------------------------------+

回答1:

Look into using TreeReduce https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/rdd/RDD.html#treeReduce(scala.Function2,%20int)

  • For the sequential operation you create a Set of Sets:

    • For each new Array of elements e.g. [ address 7, address 8] - iterate through existing sets to check if the intersection were non empty: if so then add those elements to that Set

      • otherwise create a new Set containing those elements
    • For the combine operation:

      • For each of the Sets on the left side of the Combine operation: -- Iterate through all Sets in the right side to find any with non-empty intersection -- If any non empty inteserction found then combine the two Sets.

Note TreeReduce is the newer naming. TreeAggregate is used in older versions of Spark