我使用GraphX处理上引发一些图形数据。 输入数据被给定为RDD[(String, String)]
。 我用下面的代码片断映射String
到VertexId
和建立图。
val input: RDD[(String, String)] = ...
val vertexIds = input.map(_._1)
.union(input.map(_._2))
.distinct()
.zipWithUniqueId()
.cache()
val edges = input.join(vertexIds)
.map { case (u, (v, uid)) => (v, uid) }
.join(vertexIds)
.map { case (v, (uid, vid)) => Edge(uid, vid, 1) }
val graph = Graph(vertexIds.map { case (v, vid) => (vid, v) }, edges )
当我做了抽查,看看顶部1000度最高的节点,我发现GraphX的结果是从原来的输入不同。 以下是我甩了高度的节点
graph.outerJoinVertices(graph.outDegrees) {
(_, vdata, deg) => (deg.getOrElse(0L), vdata)
}.vertices.map(_._2).top(1000).saveTo(....)
我怀疑.zipWithUniqueId
给出了不同的评价不稳定IDS。 我试过了
- 插入
vertexIds.count()
来强制物化,使vertexIds
没有得到重新评估。 - 插入
.sortBy(...).zipWithUniqueId()
以确保顺序是一样的。
他们都没有解决问题。 前1000度节点的结果每次运行略有不同。