Calling function inside RDD map function in Spark

2019-07-20 08:10发布

问题:

I was testing a simple string parser function defined by me in my code, but one of the worker nodes always fails at execution time. Here is the dummy code that I've been testing:

/* JUST A SIMPLE PARSER TO CLEAN PARENTHESIS */
def parseString(field: String): String = {
    val Pattern = "(.*.)".r
    field match{
        case "null" => "null"
        case Pattern(field) => field.replace('(',' ').replace(')',' ').replace(" ", "")
    }
}

/* CREATE TWO DISTRIBUTED RDDs TO JOIN THEM */
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)), 6)
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)), 6)
val manipulated_emp = emp.keyBy(t => t._3)
val manipulated_dept = dept.keyBy(t => t._2)
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept)

/* OUTPUT */
left_outer_join_data.collect.foreach(println)
/*
(30,((3,matt,30),Some((hive,30))))
(30,((5,rhonda,30),Some((hive,30))))
(20,((2,ricky,20),Some((spark,20))))
(10,((1,jordan,10),Some((hadoop,10))))
(35,((4,mince,35),None))
*/

val res = left_outer_join_data
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString))
.collect

res
.map(f => ( f._1, f._2, parseString(f._3)))
.foreach(println)

/* DESIRED OUTPUT */
/*
(3,matt,hive,30)
(5,rhonda,hive,30)
(2,ricky,spark,20)
(1,jordan,hadoop,10)
(4,mince,null)
*/

This code works if I collect the results of res in the driver first. Since this is a testing, there is no problem doing that, but my actual application would deal with millions of rows and collecting results in the driver is discouraged. So if I do the same without collecting it first, like this:

val res = left_outer_join_data
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString))

res
.map(f => ( f._1, f._2, parseString(f._3)))
.foreach(println)

I get the following:

ERROR TaskSetManager: Task 5 in stage 17.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 17.0 failed 4 times, most recent failure: Lost task 5.3 in stage 17.0 (TID 166, 192.168.28.101, executor 1): java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$
        at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
        at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
        at tele.com.SimcardMsisdn$.main(SimcardMsisdn.scala:249)
        at tele.com.SimcardMsisdn.main(SimcardMsisdn.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$
        at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
        at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Why Spark fails to execute my parser on the nodes ? Could you please recommend a solution or workaround ?

UPDATE

I found the solution to this problem (posted below), nonetheless, I'm still confused about this issue, maybe is something that I'm doing wrong.

回答1:

Well, I've managed to solve it myself by broadcasting the Pattern variable to the workers:

val Pattern = sc.broadcast("(.*.)".r)

and doing the pattern matching within the map, not in a function, and without collecting to the driver:

val res = left_outer_join_data.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString))
res.map(f => (f._1, f._2, f._3 match {
        case "null" => "null"
        case Pattern.value(f._3) => f._3.replace('(',' ').replace(')',' ').replace(" ", "")})
    )
.foreach(println)

Then I got the desired output from the worker stdout:

(3,matt,hive,30)
(5,rhonda,hive,30)
(2,ricky,spark,20)
(1,jordan,hadoop,10)
(4,mince,null)