How to Validate contents of Spark Dataframe

2019-03-16 22:54发布

问题:

I have below Scala Spark code base, which works well, but should not.

The 2nd column has data of mixed types, whereas in Schema I have defined it of IntegerType. My actual program has over 100 columns, and keep deriving multiple child DataFrames after transformations.

How can I validate that contents of RDD or DataFrame fields have correct datatype values, and thus ignore invalid rows or change contents of column to some default value. Any more pointers for data quality checks with DataFrame or RDD are appreciated.

var theSeq = Seq(("X01", "41"),
    ("X01", 41),
    ("X01", 41),
    ("X02", "ab"),
    ("X02", "%%"))

val newRdd = sc.parallelize(theSeq)
val rowRdd = newRdd.map(r => Row(r._1, r._2))

val theSchema = StructType(Seq(StructField("ID", StringType, true),
    StructField("Age", IntegerType, true)))
val theNewDF = sqc.createDataFrame(rowRdd, theSchema)
theNewDF.show()  

回答1:

First of all passing schema is simply a way to avoid type inference. It is not validated or enforced during DataFrame creation. On a side note I wouldn't describe ClassCastException as working well. For a moment I thought you actually found a bug.

I think the important question is how you get data like theSeq / newRdd in the first place. Is it something you parse by yourself, is it received from an external component? Simply looking at the type (Seq[(String, Any)] / RDD[(String, Any)] respectively) you already know it is not a valid input for a DataFrame. Probably the way to handle things at this level is to embrace static typing. Scala provides quite a few neat ways to handle unexpected conditions (Try, Either, Option) where the last one is the simplest one, and as a bonus works well with Spark SQL. Rather simplistic way to handle things could look like this

def validateInt(x: Any) = x match {
  case x: Int => Some(x)
  case _ => None
}

def validateString(x: Any) = x match { 
  case x: String => Some(x)
  case _ => None
}

val newRddOption: RDD[(Option[String], Option[Int])] = newRdd.map{
  case (id, age) => (validateString(id), validateInt(age))}

Since Options can be easily composed you can add additional checks like this:

def validateAge(age: Int) = {
  if(age >= 0 && age < 150) Some(age)
  else None
}

val newRddValidated: RDD[(Option[String], Option[Int])] = newRddOption.map{
  case (id, age) => (id, age.flatMap(validateAge))}

Next instead of Row which is a very crude container I would use cases classes:

case class Record(id: Option[String], age: Option[Int])

val records: RDD[Record] = newRddValidated.map{case (id, age) => Record(id, age)}

At this moment all you have to do is call toDF:

import org.apache.spark.sql.DataFrame

val df: DataFrame = records.toDF
df.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)

This was the hard but arguably a more elegant way. A faster is to let SQL casting system to do a job for you. First lets convert everything to Strings:

val stringRdd: RDD[(String, String)] = sc.parallelize(theSeq).map(
  p => (p._1.toString, p._2.toString))

Next create a DataFrame:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col

val df: DataFrame = stringRdd.toDF("id", "age")

val expectedTypes = Seq(StringType, IntegerType)
val exprs: Seq[Column] = df.columns.zip(expectedTypes).map{
  case (c, t) => col(c).cast(t).alias(c)}

val dfProcessed: DataFrame = df.select(exprs: _*)

And the result:

dfProcessed.printSchema

// root
//  |-- id: string (nullable = true)
//  |-- age: integer (nullable = true)


dfProcessed.show

// +---+----+
// | id| age|
// +---+----+
// |X01|  41|
// |X01|  41|
// |X01|  41|
// |X02|null|
// |X02|null|
// +---+----+


回答2:

In version 1.4 or older

import org.apache.spark.sql.execution.debug._
theNewDF.typeCheck

It was removed via SPARK-9754 though. I haven't checked but I think typeCheck becomes sqlContext.debug beforehand