Schema comparison of two dataframes in scala

2020-01-31 07:47发布

问题:

I am trying to write some test cases to validate the data between source (.csv) file and target (hive table). One of the validation is the Structure validation of the table.

I have load the .csv data (using a defined schema) into one dataframe and extracted the hive table data into another dataframe.
When I now try to compare the schema of the two dataframes, it returns false. Not sure why. Any idea on this please?

source dataframe schema:

scala> res39.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

target dataframe schema:

scala> targetRawData.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

When I compare, it returns false:

scala> res39.schema == targetRawData.schema
res47: Boolean = false

Data in the two dataframes is shown below:

scala> res39.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS| Naveen |             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS| Naveen |             100|   115.78|
|2015-04-02 23:24:25|2015-04-02 23:24:25|        RBS|   Arun |             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|   Arun |             100|    30.98|
|2018-06-04 10:11:12|2018-06-04 10:11:12|        XZX|   Arun |             400|     12.0|
+-------------------+-------------------+-----------+--------+----------------+---------+


scala> targetRawData.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS|  Naveen|             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS|  Naveen|             100|   115.78|
|2015-04-02 23:25:25|2015-04-02 23:25:25|        RBS|    Arun|             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|    Arun|             100|    30.98|
+-------------------+-------------------+-----------+--------+----------------+---------+

The complete code looks like below:

//import org.apache.spark
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext

  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()

   // set source and target location
    val sourceDataLocation = "hdfs://localhost:9000/source.txt"
    val targetTableName = "TableA"

    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val sourceRawCsvData = sc.textFile(sourceDataLocation)

    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)


    // Add the test cases here
    // Test 2 - Validate the Structure
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)

       val data = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)

       val dataFrame = spark.createDataFrame(data,schema)
       val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*)
       data.collect
       data.getClass
    // Test 3 - Validate the data
    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - Test 5

  def UpdateResult(tableName: String, returnCode: Int, description: String){
    val insertString = "INSERT INTO TestResult VALUES('" + tableName + "', " + returnCode + ",'" + description + "')"
    val a = hc.sql(insertString)

    }


  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime", dataType = TimestampType, nullable = true),
        StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
        StructField(name = "source_bank", dataType = StringType, nullable = true),
        StructField(name = "emp_name", dataType = StringType, nullable = true),
        StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
        StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
        )
    )
  }

  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim), convertToTimestamp(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
    }


  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

  }

回答1:

Based on @Derek Kaknes's answer, here's the solution I came up with for comparing schemas, being concerned only about column name, datatype & nullability and indifferent to metadata

// Extract relevant information: name (key), type & nullability (values) of columns
def getCleanedSchema(df: DataFrame): Map[String, (DataType, Boolean)] = {
    df.schema.map { (structField: StructField) =>
      structField.name.toLowerCase -> (structField.dataType, structField.nullable)
    }.toMap
  }

// Compare relevant information
def getSchemaDifference(schema1: Map[String, (DataType, Boolean)],
                        schema2: Map[String, (DataType, Boolean)]
                       ): Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])] = {
  (schema1.keys ++ schema2.keys).
    map(_.toLowerCase).
    toList.distinct.
    flatMap { (columnName: String) =>
      val schema1FieldOpt: Option[(DataType, Boolean)] = schema1.get(columnName)
      val schema2FieldOpt: Option[(DataType, Boolean)] = schema2.get(columnName)

      if (schema1FieldOpt == schema2FieldOpt) None
      else Some(columnName -> (schema1FieldOpt, schema2FieldOpt))
    }.toMap
}
  • getCleanedSchema method extracts information of interest - column datatype & nullability and returns a map of column name to tuple

  • getSchemaDifference method returns a map containing only those columns that differ in the two schemas. If a column is absent in one of the two schemas, then it's corresponding properties would be None



回答2:

I've just had the exact same problem. When you read data from Hive the schema's StructField component will sometimes contain Hive metadata in the field metadata. You can't see it when printing the schemas because this field is not part of the toString definition.

Here is the solution I've decided to use, I just get a copy of the schema with an empty Metadata before comparing it :

schema.map(_.copy(metadata = Metadata.empty))


回答3:

I've had this issue before and it was caused by differences in the StructField.metadata attribute. It is almost impossible to identify this out of the box, as simple inspection of the StructField's will only show the name, datatype and nullable values. My suggestion to debug it would be to compare the metadata of your fields. Something like this maybe:

res39.schema.zip(targetRawData.schema).foreach{ case (r: StructField, t: StructField) => 
  println(s"Field: ${r.name}\n--| res_meta: ${r.metadata}\n--|target_meta: ${t.metadata}")}

If you want to compare schemas but ignore metadata, then I don't have a great solution. The best that I've been able to come up with is to iterate over the StructFields and manually remove the metadata, then create a temporary copy of the dataframe without metadata. So you can do something like this (assuming that df is the dataframe you want to strip of of metadata):

val schemaWithoutMetadata = StructType(df.schema.map{ case f: StructField => 
  StructField(f.name, f.dataType, f.nullable)
})
val tmpDF = spark.sqlContext.createDataFrame(df.rdd, schemaWithoutMetadata)

Then you can either compare the dataframes directly or compare the schemas the way you have been attempting. I assume that this solution would not be performant, so should only be used on small datasets.



回答4:

Here is another solution based on observation that the string representation of name + DataType + nullable is unique for each column:

import org.apache.spark.sql.types.{StructType, StructField}

val schemaDiff: (StructType, StructType)  => List[StructField] = (schema1, schema2) => {
      val toMap: StructType => Map[String, StructField] = schema => {
        schema.map(sf => {
          val name = s"${sf.name}-${sf.dataType.typeName}-${sf.nullable.toString}"
          (name -> sf)
        }).toMap
      }

      val schema1Set = toMap(schema1).toSet
      val schema2Set = toMap(schema2).toSet
      val commonItems =  schema1Set.intersect(schema2Set)

      (schema1Set ++ schema2Set -- commonItems).toMap.values.toList
}

Notice that the field name is case sensitive hence different column names imply different columns.

The steps:

  1. For each schema generate a Map[String, StructField] where each key is formatted as name-datatype-nullable
  2. Get intersection of schemas
  3. Subtract intersection from union of schemas
  4. Return the difference into a list of StructFields

Usage: schemaDiff(df1.schema, df.schema)



回答5:

This is a Java level Object comparison problem, you should try with .equals(). This mostly works unless different SourceTypes introduce metadata, nullability issues.