Spark 2 Dataset Null value exception

2019-03-27 16:20发布

问题:

Getting this null error in spark Dataset.filter

Input CSV:

name,age,stat
abc,22,m
xyz,,s

Working code:

case class Person(name: String, age: Long, stat: String)

val peopleDS = spark.read.option("inferSchema","true")
  .option("header", "true").option("delimiter", ",")
  .csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()

Failing code (Adding following lines return error):

val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()

Returns null error

java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

回答1:

Exception you get should explain everything but let's go step-by-step:

  • When load data using csv data source all fields are marked as nullable:

    val path: String = ???
    
    val peopleDF = spark.read
      .option("inferSchema","true")
      .option("header", "true")
      .option("delimiter", ",")
      .csv(path)
    
    peopleDF.printSchema
    
    root
    |-- name: string (nullable = true)
    |-- age: integer (nullable = true)
    |-- stat: string (nullable = true)
    
  • Missing field is represented as SQL NULL

    peopleDF.where($"age".isNull).show
    
    +----+----+----+
    |name| age|stat|
    +----+----+----+
    | xyz|null|   s|
    +----+----+----+
    
  • Next you convert Dataset[Row] to Dataset[Person] which uses Long to encode age field. Long in Scala cannot be null. Because input schema is nullable, output schema stays nullable despite of that:

    val peopleDS = peopleDF.as[Person]
    
    peopleDS.printSchema
    
    root
     |-- name: string (nullable = true)
     |-- age: integer (nullable = true)
     |-- stat: string (nullable = true)
    

    Note that it as[T] doesn't affect the schema at all.

  • When you query Dataset using SQL (on registered table) or DataFrame API Spark won't deserialize the object. Since schema is still nullable we can execute:

    peopleDS.where($"age" > 30).show
    
    +----+---+----+
    |name|age|stat|
    +----+---+----+
    +----+---+----+
    

    without any issues. This is just a plain SQL logic and NULL is a valid value.

  • When we use statically typed Dataset API:

    peopleDS.filter(_.age > 30)
    

    Spark has to deserialize the object. Because Long cannot be null (SQL NULL) it fails with exception you've seen.

    If it wasn't for that you'd get NPE.

  • Correct statically typed representation of your data should use Optional types:

    case class Person(name: String, age: Option[Long], stat: String)
    

    with adjusted filter function:

    peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
    
    +----+---+----+
    |name|age|stat|
    +----+---+----+
    +----+---+----+
    

    If you prefer you can use pattern matching:

    peopleDS.filter {
      case Some(age) => age > 30
      case _         => false     // or case None => false
    }
    

    Note that you don't have to (but it would be recommended anyway) to use optional types for name and stat. Because Scala String is just a Java String it be can null. Of course if you go with this approach you have to explicitly check if accessed values are null or not.

Related Spark 2.0 Dataset vs DataFrame