根据不同的情况下,类创建数据集[复制](Creating dataset based on diff

2019-10-29 02:36发布

这个问题已经在这里有一个答案:

  • 编码ADT /密封性状层级到火花DataSet列的 1个回答

你好我有正在读一个CSV文件后,基本上做出一个RDD。 我已经定义其中RDD的线基本上映射到基于输入参数不同的情况下的类的方法。

该RDD返回需要转换到数据帧当我尝试运行相同的,我得到下面的错误。

定义的方法是

  case class Australiafile1(sectionName: String, profitCentre: String, valueAgainst: String, Status: String)

  case class Australiafile2(sectionName: String, profitCentre: String)

  case class defaultclass(error: String)

  def mapper(line: String, recordLayoutClassToBeUsed: String) = {

    val fields = line.split(",")
    var outclass = recordLayoutClassToBeUsed match {
      case ("Australiafile1") => Australiafile1(fields(0), fields(1), fields(2), fields(3))
      case ("Australiafile2") => Australiafile2(fields(0), fields(1))
    }
    outclass

  }

该方法的输出被用于如下创建一个数据帧

      val inputlines = spark.sparkContext.textFile(inputFile).cache().mapPartitionsWithIndex { (idx, lines) => if (idx == 0) lines.drop(numberOfLinesToBeRemoved.toInt) else lines }.cache()
      val records = inputlines.filter(x => !x.isEmpty).filter(x => x.split(",").length > 0).map(lines => mapper(lines, recordLayoutClassToBeUsed))

      import spark.implicits._

      val recordsDS = records.toDF()
      recordsDS.createTempView("recordtable")
      val output = spark.sql("select * from recordtable").toDF()
      output.write.option("delimiter", "|").option("header", "false").mode("overwrite").csv(outputFile)

接收到的错误是如下

异常在线程“主要” java.lang.NoClassDefFoundError:在scala.reflect.runtime.JavaMirrors $ JavaMirror:对应于产品与序列化的scala.reflect.runtime.JavaMirrors $ JavaMirror.typeToJavaClass(1300 JavaMirrors.scala)没有发现Java类.runtimeClass(JavaMirrors.scala:192)在scala.reflect.runtime.JavaMirrors $ JavaMirror.runtimeClass(JavaMirrors.scala:54)在org.apache.spark.sql.catalyst.encoders.ExpressionEncoder $。适用(ExpressionEncoder.scala: 60)在org.apache.spark.sql.Encoders $。产品(Encoders.scala:275)在org.apache.spark.sql.LowPrioritySQLImplicits $ class.newProductEncoder(SQLImplicits.scala:233)在org.apache.spark。 sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)

能否请你指教什么是错在此,我如何克服这个问题?

Answer 1:

尝试:

trait AustraliaFile extends Serializable

case class Australiafile1(sectionName: String, profitCentre: String, valueAgainst: String, Status: String) extends AustraliaFile

case class Australiafile2(sectionName: String, profitCentre: String) extends AustraliaFile

你的类不是Serializable ,但星火只能写序列化对象。 此外,它总是立足相关的类关闭一个共同的祖先,所以你可以宣布你的RDD作为一个好主意RDD[AustraliaFile]代替RDD[Any]

另外,你的类匹配逻辑可以简化为

def mapper(line: String, recordLayoutClassToBeUsed: String) = {
  val fields = line.split(",")
  recordLayoutClassToBeUsed match {
     case ("Australiafile1") => Australiafile1(fields(0), fields(1), fields(2), fields(3))
    case ("Australiafile2") => Australiafile2(fields(0), fields(1))
  }
}


文章来源: Creating dataset based on different case classes [duplicate]