How to access lookup(broadcast) RDD(or dataset) in

2020-08-09 06:42发布

问题:

I am new to spark and scala and just started learning ... I am using spark 1.0.0 on CDH 5.1.3

I got a broadcasted rdd named dbTableKeyValueMap: RDD[(String, String)], I want to use dbTableKeyValueMap to deal with my fileRDD( each row has 300+ columns). This is the code:

val get = fileRDD.map({x =>
  val tmp = dbTableKeyValueMap.lookup(x)
  tmp
})

Running this locally hangs and/or after sometime gives error :

scala.MatchError: null
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)

I can understand accessing one RDD inside other will have issues, if locality and size of collection come into picture.. For me taking Cartesian product is not option as records in file RDD are huge(each row with 300+ columns) ... Just like I used distributed cache to load this dbTableKeyValueMap in setup method and use in MAP of hadoop java mapreduce code, I want to use similar way in spark map... I am not able to find simple example to refer similar usecase... One by one I want to iterate over fileRDD rows and do some transformation, beatifications, lookups etc. on "each column" for further processing... Or there is any other way where I can use dbTableKeyValueMap as scala collection instead of spark RDD

Please help

回答1:

Thanks.... easiest thing to do was to convert lookup RDD into "scala collection" and good to go!! I am able to access it inside transformations with any RDD....

val scalaMap = dbTableKeyValueMap.collectAsMap.toMap
val broadCastLookupMap = sc.broadcast(scalaMap)

val get = fileRDD.map({x =>
  val tmp = broadCastLookupMap.value.get(x).head
  tmp
})

This easy solution should be documented somewhere for early learners ..It took while for me to figure it out...

Thanks for the help...



回答2:

I can understand accessing one RDD inside other will have issues, if locality and size of collection come into picture

Not really. It simply won't work. Spark doesn't support nested actions and transformations. It means that a broadcasted RDD cannot be used to access the data.

Typically you have three choices:

  • collect RDD and broadcast local variable (see: Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?)
  • use join (it looks like it is what you need here):

    fileRDD.map(x => (x, null)).join(fileRDD)
    
  • use external storage accessible from all workers