Spark error: Exception in thread “main” java.lang.

2019-08-16 15:48发布

问题:

I am writing a Scala/spark program which would find the max salary of the employee. The employee data is available in a CSV file, and the salary column has a comma separator for thousands and also it has a $ prefixed to it e.g. $74,628.00.

To handle this comma and dollar sign, I have written a parser function in scala which would split each line on "," and then map each column to individual variables to be assigned to a case class.

My parser program looks like below. In this to eliminate the comma and dollar signs I am using the replace function to replace it with empty, and then finally typecase to Int.

def ParseEmployee(line: String): Classes.Employee = {
    val fields = line.split(",")
    val Name = fields(0)
    val JOBTITLE = fields(2)
    val DEPARTMENT = fields(3)
    val temp = fields(4)

    temp.replace(",","")//To eliminate the ,
    temp.replace("$","")//To remove the $
    val EMPLOYEEANNUALSALARY = temp.toInt //Type cast the string to Int

    Classes.Employee(Name, JOBTITLE, DEPARTMENT, EMPLOYEEANNUALSALARY)
  }

My Case class look like below

case class Employee (Name: String,
                      JOBTITLE: String,
                     DEPARTMENT: String,
                     EMPLOYEEANNUALSALARY: Number,
)

My spark dataframe sql query looks like below

val empMaxSalaryValue = sc.sqlContext.sql("Select Max(EMPLOYEEANNUALSALARY) From EMP")
empMaxSalaryValue.show

when I Run this program I am getting this below exception

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for Number
- field (class: "java.lang.Number", name: "EMPLOYEEANNUALSALARY")
- root class: "Classes.Employee"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:625)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:619)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:607)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:607)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:282)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:272)
    at CalculateMaximumSalary$.main(CalculateMaximumSalary.scala:27)
    at CalculateMaximumSalary.main(CalculateMaximumSalary.scala)
  1. Any idea why I am getting this error? what is the mistake I am doing here and why it is not able to typecast to number?

  2. Is there any better approach to handle this problem of getting maximum salary of the employee?

回答1:

Spark SQL provides only a limited number of Encoders which target concrete classes. Abstract classes like Number are not supported (can be used with limited binary Encoders).

Since you convert to Int anyway, just redefine the class:

case class Employee (
  Name: String,
  JOBTITLE: String,
  DEPARTMENT: String,
  EMPLOYEEANNUALSALARY: Int
)