I have the following two DataFrames in Spark 2.2 and Scala 2.11. The DataFrame edges
defines the edges of a directed graph, while the DataFrame types
defines the type of each node.
edges =
+-----+-----+----+
|from |to |attr|
+-----+-----+----+
| 1| 0| 1|
| 1| 4| 1|
| 2| 2| 1|
| 4| 3| 1|
| 4| 5| 1|
+-----+-----+----+
types =
+------+---------+
|nodeId|type |
+------+---------+
| 0| 0|
| 1| 0|
| 2| 2|
| 3| 4|
| 4| 4|
| 5| 4|
+------+---------+
For each node, I want to know the number of edges to the nodes of the same type
. Please notice that I only want to count the edges outgoing from a node, since I deal with the directed graph.
In order to reach this objective, I performed the joining of both DataFrames:
val graphDF = edges
.join(types, types("nodeId") === edges("from"), "left")
.drop("nodeId")
.withColumnRenamed("type","type_from")
.join(types, types("nodeId") === edges("to"), "left")
.drop("nodeId")
.withColumnRenamed("type","type_to")
I obtained the following new DataFrame graphDF
:
+-----+-----+----+---------------+---------------+
|from |to |attr|type_from |type_to |
+-----+-----+----+---------------+---------------+
| 1| 0| 1| 0| 0|
| 1| 4| 1| 0| 4|
| 2| 2| 1| 2| 2|
| 4| 3| 1| 4| 4|
| 4| 5| 1| 4| 4|
+-----+-----+----+---------------+---------------+
Now I need to get the following final result:
+------+---------+---------+
|nodeId|numLinks |type |
+------+---------+---------+
| 0| 0| 0|
| 1| 1| 0|
| 2| 0| 2|
| 3| 0| 4|
| 4| 2| 4|
| 5| 0| 4|
+------+---------+---------+
I was thinking about using groupBy
and agg(count(...)
, but I do not know how to deal with directed edges.
Update:
numLinks
is calculated as the number of edges outgoing from a given node. For example, the node 5 does not have any outgoing edges (only ingoing edge 4->5
, see the DataFrame edges
). The same refers to the node 0. But the node 4 has two outgoing edges (4->3
and 4->5
).
My solution:
This is my solution, but it lacks those nodes that have 0 links.
graphDF.filter("from != to").filter("type_from == type_to").groupBy("from").agg(count("from") as "numLinks").show()