I am trying to write some test cases to validate the data between source (.csv) file and target (hive table). One of the validation is the Structure validation of the table.
I have load the .csv data (using a defined schema) into one dataframe and extracted the hive table data into another dataframe.
When I now try to compare the schema of the two dataframes, it returns false. Not sure why. Any idea on this please?
source dataframe schema:
scala> res39.printSchema
root
|-- datetime: timestamp (nullable = true)
|-- load_datetime: timestamp (nullable = true)
|-- source_bank: string (nullable = true)
|-- emp_name: string (nullable = true)
|-- header_row_count: integer (nullable = true)
|-- emp_hours: double (nullable = true)
target dataframe schema:
scala> targetRawData.printSchema
root
|-- datetime: timestamp (nullable = true)
|-- load_datetime: timestamp (nullable = true)
|-- source_bank: string (nullable = true)
|-- emp_name: string (nullable = true)
|-- header_row_count: integer (nullable = true)
|-- emp_hours: double (nullable = true)
When I compare, it returns false:
scala> res39.schema == targetRawData.schema
res47: Boolean = false
Data in the two dataframes is shown below:
scala> res39.show
+-------------------+-------------------+-----------+--------+----------------+---------+
| datetime| load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03| RBS| Naveen | 100| 15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03| RBS| Naveen | 100| 115.78|
|2015-04-02 23:24:25|2015-04-02 23:24:25| RBS| Arun | 200| 2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14| RBS| Arun | 100| 30.98|
|2018-06-04 10:11:12|2018-06-04 10:11:12| XZX| Arun | 400| 12.0|
+-------------------+-------------------+-----------+--------+----------------+---------+
scala> targetRawData.show
+-------------------+-------------------+-----------+--------+----------------+---------+
| datetime| load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03| RBS| Naveen| 100| 15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03| RBS| Naveen| 100| 115.78|
|2015-04-02 23:25:25|2015-04-02 23:25:25| RBS| Arun| 200| 2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14| RBS| Arun| 100| 30.98|
+-------------------+-------------------+-----------+--------+----------------+---------+
The complete code looks like below:
//import org.apache.spark
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext
//val conf = new SparkConf().setAppName("Simple Application")
//val sc = new SparkContext(conf)
val hc = new HiveContext(sc)
val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()
// set source and target location
val sourceDataLocation = "hdfs://localhost:9000/source.txt"
val targetTableName = "TableA"
// Extract source data
println("Extracting SAS source data from csv file location " + sourceDataLocation);
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val sourceRawCsvData = sc.textFile(sourceDataLocation)
println("Extracting target data from hive table " + targetTableName)
val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)
// Add the test cases here
// Test 2 - Validate the Structure
val headerColumns = sourceRawCsvData.first().split(",").to[List]
val schema = TableASchema(headerColumns)
val data = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
.map(_.split(",").toList)
.map(row)
val dataFrame = spark.createDataFrame(data,schema)
val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*)
data.collect
data.getClass
// Test 3 - Validate the data
// Test 4 - Calculate the average and variance of Int or Dec columns
// Test 5 - Test 5
def UpdateResult(tableName: String, returnCode: Int, description: String){
val insertString = "INSERT INTO TestResult VALUES('" + tableName + "', " + returnCode + ",'" + description + "')"
val a = hc.sql(insertString)
}
def TableASchema(columnName: List[String]): StructType = {
StructType(
Seq(
StructField(name = "datetime", dataType = TimestampType, nullable = true),
StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
StructField(name = "source_bank", dataType = StringType, nullable = true),
StructField(name = "emp_name", dataType = StringType, nullable = true),
StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
)
)
}
def row(line: List[String]): Row = {
Row(convertToTimestamp(line(0).trim), convertToTimestamp(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
}
def convertToTimestamp(s: String) : Timestamp = s match {
case "" => null
case _ => {
val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => t
case Failure(_) => null
}
}
}
}