This question already has an answer here:
Hi I have an RDD which is basically made after reading a CSV file. I have defined a method which basically maps the lines of rdd to different case classes based on input parameter.
The RDD returned need to be converted to dataframe When I try to run the same I get below error.
Method defined is
case class Australiafile1(sectionName: String, profitCentre: String, valueAgainst: String, Status: String)
case class Australiafile2(sectionName: String, profitCentre: String)
case class defaultclass(error: String)
def mapper(line: String, recordLayoutClassToBeUsed: String) = {
val fields = line.split(",")
var outclass = recordLayoutClassToBeUsed match {
case ("Australiafile1") => Australiafile1(fields(0), fields(1), fields(2), fields(3))
case ("Australiafile2") => Australiafile2(fields(0), fields(1))
}
outclass
}
The output of the method is used to create a dataframe as below
val inputlines = spark.sparkContext.textFile(inputFile).cache().mapPartitionsWithIndex { (idx, lines) => if (idx == 0) lines.drop(numberOfLinesToBeRemoved.toInt) else lines }.cache()
val records = inputlines.filter(x => !x.isEmpty).filter(x => x.split(",").length > 0).map(lines => mapper(lines, recordLayoutClassToBeUsed))
import spark.implicits._
val recordsDS = records.toDF()
recordsDS.createTempView("recordtable")
val output = spark.sql("select * from recordtable").toDF()
output.write.option("delimiter", "|").option("header", "false").mode("overwrite").csv(outputFile)
The error received is as below
Exception in thread "main" java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable found at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1300) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:60) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)
Could you please advise what is wrong in this , how can i overcome this?
Try:
Your classes are not
Serializable
, yet Spark can only write serializable objects. Also it's always a good idea to base related classes off a common ancestor, so that you could declare your RDD asRDD[AustraliaFile]
instead ofRDD[Any]
Also, your class matching logic can be simplified as