Creating dataset based on different case classes [

2019-08-28 04:48发布

问题:

This question already has an answer here:

  • Encode an ADT / sealed trait hierarchy into Spark DataSet column 1 answer

Hi I have an RDD which is basically made after reading a CSV file. I have defined a method which basically maps the lines of rdd to different case classes based on input parameter.

The RDD returned need to be converted to dataframe When I try to run the same I get below error.

Method defined is

  case class Australiafile1(sectionName: String, profitCentre: String, valueAgainst: String, Status: String)

  case class Australiafile2(sectionName: String, profitCentre: String)

  case class defaultclass(error: String)

  def mapper(line: String, recordLayoutClassToBeUsed: String) = {

    val fields = line.split(",")
    var outclass = recordLayoutClassToBeUsed match {
      case ("Australiafile1") => Australiafile1(fields(0), fields(1), fields(2), fields(3))
      case ("Australiafile2") => Australiafile2(fields(0), fields(1))
    }
    outclass

  }

The output of the method is used to create a dataframe as below

      val inputlines = spark.sparkContext.textFile(inputFile).cache().mapPartitionsWithIndex { (idx, lines) => if (idx == 0) lines.drop(numberOfLinesToBeRemoved.toInt) else lines }.cache()
      val records = inputlines.filter(x => !x.isEmpty).filter(x => x.split(",").length > 0).map(lines => mapper(lines, recordLayoutClassToBeUsed))

      import spark.implicits._

      val recordsDS = records.toDF()
      recordsDS.createTempView("recordtable")
      val output = spark.sql("select * from recordtable").toDF()
      output.write.option("delimiter", "|").option("header", "false").mode("overwrite").csv(outputFile)

The error received is as below

Exception in thread "main" java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable found at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1300) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:60) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)

Could you please advise what is wrong in this , how can i overcome this?

回答1:

Try:

trait AustraliaFile extends Serializable

case class Australiafile1(sectionName: String, profitCentre: String, valueAgainst: String, Status: String) extends AustraliaFile

case class Australiafile2(sectionName: String, profitCentre: String) extends AustraliaFile

Your classes are not Serializable, yet Spark can only write serializable objects. Also it's always a good idea to base related classes off a common ancestor, so that you could declare your RDD as RDD[AustraliaFile] instead of RDD[Any]

Also, your class matching logic can be simplified as

def mapper(line: String, recordLayoutClassToBeUsed: String) = {
  val fields = line.split(",")
  recordLayoutClassToBeUsed match {
     case ("Australiafile1") => Australiafile1(fields(0), fields(1), fields(2), fields(3))
    case ("Australiafile2") => Australiafile2(fields(0), fields(1))
  }
}