Some context can be found here, the idea is that I have created a graph from tuples collected from a request on a Hive table. Those correspond to trade relations between countries. Having built the graph this way, the vertices are not labelled. I want to study the distribution of degrees and get the most connected countries' names. I tried 2 options :
- First : I tried to map the index of the vertices with the string names of the vertices with the function idMapbis inside the function which is collecting and printing the ten top connected degrees.
- Second : I tried to add label to the vertices of the graph itself.
In both cases I get the following error : the task is not serializable
Global code :
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays
couples look like this: Array[(Any, Any)] = Array((MWI,MOZ), (WSM,AUS), (MDA,CRI), (KNA,HTI), (PER,ERI), (SWE,CUB),...
val idMap = sc.broadcast(couples
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct
.zipWithIndex
.map{case (k, v) => (k, v.toLong)}
.toMap)
val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
.map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))})
val graph = Graph.fromEdgeTuples(edges, 1)
built this way, vertices look like (68,1) for example
val degrees: VertexRDD[Int] = graph.degrees.cache()
//Most connected vertices
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(Int, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
(id, degree, k) => (id.toInt, degree)}
val ord = Ordering.by[(Int, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
We get : (79,1016),(64,912),(55,889)...
First option to retrieve the names :
val idMapbis = sc.parallelize(couples
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct
.zipWithIndex
.map{case (k, v) => (v,k)}
.toMap)
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
(id, degree, name) => (idMapbis.value(id.toInt), degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
The task is not serializable but the function idMapbis is working since there is no error with idMapbis.value(graph.vertices.take(1)(0)._1.toInt)
Option 2:
graph.vertices.map{case (k, v) => (k,idMapbis.value(k.toInt))}
The task is not serializable again (for context here is how topNamesAndDegrees is modified to obtain the names of the most connected vertices in this option)
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
(id, degree, name) => (name, degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
I am interested in understanding how to improve one of this option, maybe both if someone see how.