I have a lookup rdd of size 6000, lookup_rdd: RDD[String]
a1
a2
a3
a4
a5 .....
and another rdd, data_rdd: RDD[(String, Iterable[(String, Int)])]: (id,(item,count)) which has unique ids,
(id1,List((a1,2), (a3,4)))
(id2,List((a2,1), (a4,2), (a1,1)))
(id3,List((a5,1)))
FOREACH element in lookup_rdd I want to check whether each id has that element or not, if it is there I put the count and if it's not I put 0, and store in a file.
What is the efficient way to achieve this. Is hashing possible? eg. output I want is:
id1,2,0,4,0,0
id2,1,1,0,2,0
id3,0,0,0,0,1
I have tried this:
val headers = lookup_rdd.zipWithIndex().persist()
val indexing = data_rdd.map{line =>
val id = line._1
val item_cnt_list = line._2
val arr = Array.fill[Byte](6000)(0)
item_cnt_list.map(c=>(headers.lookup(c._1),c._2))
}
indexing.collect().foreach(println)
I get the exception:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations