How to use RDD in other RDDs map method?

2019-05-09 21:44发布

问题:

I got a rdd named index: RDD[(String, String)], I want to use index to deal with my file. This is the code:

val get = file.map({x =>
  val tmp = index.lookup(x).head
  tmp
})

The question is that I can not use index in the file.map Function, I ran this program and it gave me feedback like this:

14/12/11 16:22:27 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 602, spark2): scala.MatchError: null
        org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:770)
        com.ynu.App$$anonfun$12.apply(App.scala:270)
        com.ynu.App$$anonfun$12.apply(App.scala:265)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        scala.collection.AbstractIterator.to(Iterator.scala:1157)
        scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
        org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

I don't know why. And if I want to implement this function what can I do? Thanks

回答1:

You should see RDDs as virtual collections. The RDD reference, only points to where the data is, in itself it has no data, so there's no point on using it in a closure.

You will need to use functions that combine RDDs together in order to achieve the desired functionality. Also, lookup as defined here is a very sequential process that requires all the lookup data available in the memory of each worker - this will not scale up.

To resolve all elements of the file rdd that to their value in index you should join both RDDs:

val resolvedFileRDD = file.keyBy(identity).join(index) // this will have the form of (key, (key,index of key))