How to use window specification and join condition

2019-02-15 22:15发布

问题:

Here is my DF1

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction
4295858898|^|204|^|205|^|1|^|I|!|
4295858898|^|204|^|208|^|2|^|I|!|
4295858898|^|204|^|209|^|2|^|I|!|
4295858898|^|204|^|211|^|3|^|I|!|
4295858898|^|204|^|212|^|3|^|I|!|
4295858898|^|204|^|214|^|4|^|I|!|
4295858898|^|204|^|215|^|4|^|I|!|
4295858898|^|206|^|207|^|1|^|I|!|
4295858898|^|206|^|210|^|2|^|I|!|
4295858898|^|206|^|213|^|3|^|I|!|

Here is my DF2

   DataPartition|^|PartitionYear|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
  SelfSourcedPublic|^|2002|^|1511224917595|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917596|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917597|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917598|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917599|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917600|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917601|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917602|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917603|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917604|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917605|^|4295858941|^|1|^|2|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917606|^|4295858941|^|1|^|3|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917607|^|4295858941|^|5|^|6|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917608|^|4295858941|^|5|^|7|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917609|^|4295858941|^|12|^|10|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917610|^|4295858941|^|12|^|11|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917611|^|4295858941|^|1|^|13|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1511224917612|^|4295858941|^|12|^|14|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917613|^|4295858941|^|5|^|15|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917614|^|4295858941|^|5|^|16|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917615|^|4295858941|^|1|^|17|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917616|^|4295858941|^|1|^|18|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917617|^|4295858941|^|5|^|19|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917618|^|4295858941|^|5|^|20|^|2|^|O|!|
SelfSourcedPublic|^|2001|^|1511224917619|^|4295858941|^|5|^|21|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917620|^|4295858941|^|1|^|22|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1511224917621|^|4295858941|^|1|^|23|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1511224917622|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1511224917642|^|4295858941|^|null|^|35|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1511224917643|^|4295858941|^|null|^|36|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1511224917644|^|4295858941|^|null|^|37|^|null|^|D|!|

I want to implement join based on the value of the column.

This is what I am trying to achieve in Spark-Scala for example but don't know how to implement it

If the FFAction_1 =I in the DF2 then below condition

(join and partitionBy on three columns "OrganizationId", "AnnualPeriodId","InterimPeriodId")

val windowSpec = Window.partitionBy("OrganizationId", "AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) 

val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer")

.select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId",
   when($"FFAction_1".isNotNull, concat(col("FFAction_1"), 
   lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
  .filter(!$"FFAction".contains("D"))

If the FFAction_1 =O or D then below condition

(join and partitionBy on two columns "OrganizationId","InterimPeriodId")

val windowSpec = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) 

val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer")

.select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId",
   when($"FFAction_1".isNotNull, concat(col("FFAction_1"), 
   lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
   .filter(!$"FFAction".contains("D"))

Below is my full code

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_YearPartition = spark.udf.register("get_cus_YearPartition", (filePath: String) => filePath.split("\\.")(4))

val rdd = sc.textFile("s3://trfsmallfffile/Interim2Annual/MAIN")
val header = rdd.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_YearPartition(input_file_name))


//Loading Incremental 

val rdd1 = sc.textFile("s3://trfsmallfffile/Interim2Annual/INCR")
val header1 = rdd1.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


 //------------------------------- filtering only the latest from increamental ------------------------------

    import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey1 = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank")


    val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2))
      .filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|"))
      .drop("tobefiltered", "TimeStamp")

//-----------------separating the increamental df for insert, deletion and overwrite----------------

    //---------------insert rows are selected -------------------------------
    //insert a row if I is detected and if O is found then first delete and then insert

    val insertdf = latestForEachKey.filter($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*)

    //------------------deleted rows with primary key  "OrganizationId", "InterimPeriodId"------------------
    // delete rows from parent if both D or O is found in increamental
    val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete"))

    //join by two primary keys for deletion and delete from the parent dataframe
    val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")

val dfToSave=dfMainOutput.union(insertdf).withColumn("FFAction|!|", when($"FFAction|!|" === "O|!|" || $"FFAction|!|" === "I|!|", lit("I|!|")))

val dfMainOutputFinal = dfToSave.na.fill("").select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)


    dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/Interim2Annual/output")

   val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear").count

  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
  .save("s3://trfsmallfffile/Interim2Annual/Descr")

回答1:

DISCLAIMER Somehow this and the other question I've just answered seem duplicates so one is going to get marked as such soon or we find out the difference between them and the disclaimer goes away. Time will tell.


Given the requirement to select the final window specification and join condition based on the values of FFAction_1 column, I'd do filter first and decide what window aggregation and join to use.

val df1 = spark.
  read.
  option("header", true).
  option("sep", "|").
  csv("df1.csv").
  select("OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber", "FFAction")
scala> df1.show
+--------------+--------------+---------------+-------------+--------+
|OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber|FFAction|
+--------------+--------------+---------------+-------------+--------+
|    4295858898|           204|            205|            1|       I|
|    4295858898|           204|            208|            2|       I|
|    4295858898|           204|            209|            2|       I|
|    4295858898|           204|            211|            3|       I|
|    4295858898|           204|            212|            3|       I|
|    4295858898|           204|            214|            4|       I|
|    4295858898|           204|            215|            4|       I|
|    4295858898|           206|            207|            1|       I|
|    4295858898|           206|            210|            2|       I|
|    4295858898|           206|            213|            3|       I|
+--------------+--------------+---------------+-------------+--------+

The right-hand side of the join is fairly similar in "shape".

val df2 = spark.
  read.
  option("header", true).
  option("sep", "|").
  csv("df2.csv").
  select("DataPartition_1", "PartitionYear_1", "TimeStamp", "OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber_1", "FFAction_1")
scala> df2.show
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+
|  DataPartition_1|PartitionYear_1|    TimeStamp|OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber_1|FFAction_1|
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+
|SelfSourcedPublic|           2002|1510725106270|    4295858941|            24|             25|              4|         O|
|SelfSourcedPublic|           2002|1510725106271|    4295858941|            24|             25|              5|         O|
|SelfSourcedPublic|           2003|1510725106272|    4295858941|            30|             31|              2|         O|
|SelfSourcedPublic|           2003|1510725106273|    4295858941|            30|             31|              3|         O|
|SelfSourcedPublic|           2001|1510725106293|    4295858941|             5|             20|              2|         O|
|SelfSourcedPublic|           2001|1510725106294|    4295858941|             5|             21|              3|         O|
|SelfSourcedPublic|           2002|1510725106295|    4295858941|             1|             22|              4|         O|
|SelfSourcedPublic|           2002|1510725106296|    4295858941|             1|             23|              5|         O|
|SelfSourcedPublic|           2016|1510725106297|    4295858941|            35|             36|              1|         I|
|SelfSourcedPublic|           2016|1510725106297|    4295858941|            35|             36|              1|         D|
+-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+

With the above datasets, I'd filter out to see if there's at least one I in df2 in FFAction_1 column and select the correct window specification and join condition.

The trick is to use join operator followed by where (or filter) operator so you can decide on what join condition to use.

val noIs = df2.filter($"FFAction_1" === "I").take(1).isEmpty
val (windowSpec, joinCond) = if (noIs) {
  (windowSpecForOs, joinForOs) 
} else {
  (windowSpecForIs, joinForIs)
}
val latestForEachKey = df2result.withColumn("rank", rank() over windowSpec)
val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey).where(joinCond)