Calculate links between nodes using Spark

2019-08-16 11:37发布

问题:

I have the following two DataFrames in Spark 2.2 and Scala 2.11. The DataFrame edges defines the edges of a directed graph, while the DataFrame types defines the type of each node.

edges =

+-----+-----+----+
|from |to   |attr|
+-----+-----+----+
|    1|    0|   1|
|    1|    4|   1|
|    2|    2|   1|
|    4|    3|   1|
|    4|    5|   1|
+-----+-----+----+

types =
+------+---------+
|nodeId|type     |
+------+---------+
|     0|        0|
|     1|        0|
|     2|        2|
|     3|        4|
|     4|        4|
|     5|        4|
+------+---------+

For each node, I want to know the number of edges to the nodes of the same type. Please notice that I only want to count the edges outgoing from a node, since I deal with the directed graph.

In order to reach this objective, I performed the joining of both DataFrames:

val graphDF = edges
                  .join(types, types("nodeId") === edges("from"), "left")
                  .drop("nodeId")
                  .withColumnRenamed("type","type_from")
                  .join(types, types("nodeId") === edges("to"), "left")
                  .drop("nodeId")
                  .withColumnRenamed("type","type_to")

I obtained the following new DataFrame graphDF:

+-----+-----+----+---------------+---------------+
|from |to   |attr|type_from      |type_to        |
+-----+-----+----+---------------+---------------+
|    1|    0|   1|              0|              0|
|    1|    4|   1|              0|              4|
|    2|    2|   1|              2|              2|
|    4|    3|   1|              4|              4|
|    4|    5|   1|              4|              4|
+-----+-----+----+---------------+---------------+

Now I need to get the following final result:

+------+---------+---------+
|nodeId|numLinks |type     |
+------+---------+---------+
|     0|        0|        0| 
|     1|        1|        0|
|     2|        0|        2|
|     3|        0|        4|
|     4|        2|        4|
|     5|        0|        4| 
+------+---------+---------+

I was thinking about using groupBy and agg(count(...), but I do not know how to deal with directed edges.

Update:

numLinks is calculated as the number of edges outgoing from a given node. For example, the node 5 does not have any outgoing edges (only ingoing edge 4->5, see the DataFrame edges). The same refers to the node 0. But the node 4 has two outgoing edges (4->3 and 4->5).

My solution:

This is my solution, but it lacks those nodes that have 0 links.

graphDF.filter("from != to").filter("type_from == type_to").groupBy("from").agg(count("from") as "numLinks").show()

回答1:

You can filter, aggregate by id and type and add missing nodes using types:

val graphDF = Seq(
  (1, 0, 1, 0, 0), (1, 4, 1, 0, 4), (2, 2, 1, 2, 2),
  (4, 3, 1, 4, 4), (4, 5, 1, 4, 4)
).toDF("from", "to", "attr", "type_from", "type_to")

val types = Seq(
  (0, 0), (1, 0), (2, 2), (3, 4), (4,4), (5, 4)
).toDF("nodeId", "type")

graphDF
  // I want to know the number of edges to the nodes of the same type
  .where($"type_from" === $"type_to" && $"from" =!= $"to")
  // I only want to count the edges outgoing from a node,
  .groupBy($"from" as "nodeId", $"type_from" as "type")
  .agg(count("*") as "numLinks")
  // but it lacks those nodes that have 0 links.
  .join(types, Seq("nodeId", "type"), "rightouter")
  .na.fill(0)

// +------+----+--------+
// |nodeId|type|numLinks|
// +------+----+--------+
// |     0|   0|       0|
// |     1|   0|       1|
// |     2|   2|       1|
// |     3|   4|       0|
// |     4|   4|       2|
// |     5|   4|       0|
// +------+----+--------+

To skip self-links add $"from" =!= $"to" to the selection:

graphDF
  .where($"type_from" === $"type_to" && $"from" =!= $"to")
  .groupBy($"from" as "nodeId", $"type_from" as "type")
  .agg(count("*") as "numLinks")
  .join(types, Seq("nodeId", "type"), "rightouter")
  .na.fill(0)

// +------+----+--------+
// |nodeId|type|numLinks|
// +------+----+--------+
// |     0|   0|       0|
// |     1|   0|       1|
// |     2|   2|       0|
// |     3|   4|       0|
// |     4|   4|       2|
// |     5|   4|       0|
// +------+----+--------+