I have a json file like below.
{"name":"method2","name1":"test","parameter1":"C:/Users/test/Desktop/Online.csv","parameter2": 1.0}
I am loading my json file.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/data.json")
val df1=df.select($"name",$"parameter1",$"parameter2").toDF()
df1.show()
I have 3 function like below:
def method1(P1:String, P2:Double) {
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").dropDuplicates("CID", "Sc").withColumn("Rat", lit(P2))
val outPutPath="C:/Users/test/Desktop/output"
rs.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def method2(P1:String, P2:Double){
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").withColumn("r", lit(P2))
val rs1= rs.filter($"CID" =!= "").groupBy("CID","Sc").agg(sum(rs("r")).alias("R"))
val outPutPath="C:/Users/test/Desktop/output"
rs1.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def methodn(P1:String, P2:Double) {
println("method 2 printhing")
println(P2)
}
i am trying to call the above functions using below code
df1.map( row => (row.getString(0), row.getString(1), row.getDouble(2) ) ).foreach { x =>
x._1.trim.toLowerCase match {
case "method1" => method1(x._2, x._3)
case "method2" => method2(x._2, x._3)
case _ => methodn(x._2, x._3)
}
}
based on my json object it should call method2 but when i am trying to execute above code i am getting below error.
17/11/22 16:15:44 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
java.lang.NullPointerException
at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.method2(<console>:24)
at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:40)
at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/11/22 16:15:44 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 6, localhost, executor driver): java.lang.NullPointerException
at $line36.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.method2(<console>:24)
at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:40)
at $line38.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/11/22 16:15:44 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost, executor driver): java.lang.NullPointerException
at method2(<console>:24)
at $anonfun$2.apply(<console>:40)
at $anonfun$2.apply(<console>:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
... 54 elided
Caused by: java.lang.NullPointerException
at method2(<console>:24)
at $anonfun$2.apply(<console>:40)
at $anonfun$2.apply(<console>:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
please help me on this how to resolve this issue.
You are getting
NullPointerException
because you are trying to accesssparkSession
(spark) inside the functions(method1, method2
). Thats not an actual issue though. The main issue is that you are calling those functions from insidemap
function ofdataframe
. Thats the main issue.You cannot access variables defined outside
transformations
from withintransformations
. All the functions are being called insidetransformations
and Spark could not find any definition forspark
variable being used inside those functions. Thats the main reason for gettingnullPointerException
.The solution to this would be to call the functions from where
spark
variable can be accessed and not from within atransformation
. So changing your lasttransformation
into anaction
would do the trickI hope the answer is helpful