I need to compare two dataframes for type validati

2020-05-09 21:51发布

问题:

I am comparing two dataframes (basically these are schema of two different data sources one from hive and other from SAS9.2)

I need to validate structure for both data sources so I converted schema into two dataframes and here they are:

SAS schema will be in below format:

scala> metadata.show

+----+----------------+----+---+-----------+-----------+
|S_No|        Variable|Type|Len|     Format|   Informat|
+----+----------------+----+---+-----------+-----------+
|   1|        DATETIME| Num|  8|DATETIME20.|DATETIME20.|
|   2|   LOAD_DATETIME| Num|  8|DATETIME20.|DATETIME20.|
|   3|     SOURCE_BANK|Char|  1|       null|       null|
|   4|        EMP_NAME|Char| 50|       null|       null|
|   5|HEADER_ROW_COUNT| Num|  8|       null|       null|
|   6|       EMP_HOURS| Num|  8|       15.2|       15.1|
+----+----------------+----+---+-----------+-----------+

Similarly hive metadata will be in below format:

df2.show

+----------------+-------------+
|        Variable|         type|
+----------------+-------------+
|        datetime|TimestampType|
|   load_datetime|TimestampType|
|     source_bank|   StringType|
|        emp_name|   StringType|
|header_row_count|  IntegerType|
|       emp_hours|   DoubleType|
+----------------+-------------+

Now, I need to compare both these on column type and validate structure.Like for "Num" type equivalent is "Integertype".

Finally I need to store anon zero value as output if schema validation is successful

How can I achieve this ?

回答1:

you can join the two dataframes and then compare the two columns corressponding to the columns type via a Map and UDF. This is a code sample that does that. You need to complete the map with the right values

 val sqlCtx = sqlContext
import sqlCtx.implicits._


val metadata: DataFrame= Seq(
  (Some("1"), "DATETIME", "Num", "8", "DATETIME20", "DATETIME20"),
  (Some("3"), "SOURCEBANK", "Num", "1", "null", "null")
).toDF("SNo", "Variable", "Type", "Len", "Format", "Informat")

val metadataAdapted: DataFrame = metadata
  .withColumn("Name", functions.upper(col("Variable")))
  .withColumnRenamed("Type", "TypeHive")
val sasDF = Seq(("datetime", "TimestampType"),
  ("datetime", "TimestampType")
).toDF("variable", "type")
val sasDFAdapted = sasDF
  .withColumn("Name", functions.upper(col("variable")))
  .withColumnRenamed("Type", "TypeSaS")

val res = sasDFAdapted.join(metadataAdapted, Seq("Name"), "inner")

val map = Map("TimestampType" -> "Num")
 def udfType(dict: Map[String, String]) = functions.udf( (typeVar: String) => dict(typeVar))
val result = res.withColumn("correctMapping", udfType(map)(col("TypeSaS")) === col("TypeHive"))