How to validate history data?

2019-03-04 16:23发布

问题:

Currently we are reading date using calendar instance for picking last one month record using sparksql. Now we need: In case of extra events being added to previous day we must also be able to manually insert summary start and end dates, in case we need manual re run of job for a previous time period: e.g: a manual re run table could look like this:

rprtng_period_type_cd  summary_start_date summary_end_date  summary_iv
M                         2018-01-01      2018-01-31        2018-01
D                         2018-03-05      2018-03-05        2018-03-05
D                         2018-03-27      2018-03-27        2018-03-27

This should tell the job to calculate a monthly summary for Jan18 and two daily summaries, one for 05 march and one for 27 march

The job should take summary_start_date summary_end_date and ensure that only events with an event_dt between those two dates are only included in calculations.

My current code snippet looks like:

def execute(): Dataframe = {
  //log files
  val hivecntxt = SparkContextLoader.hiveContext
  val eventsourceTable= cusotmermetricConstants.source_table

  // Calendar information
  val abc = Calendar.getInstance
  abc.add(Calendar.month, -1)
  var month = abc.get(Calendar.MONTH)
  var year = abc.get(Calendar.YEAR)
  var fileMonth = month + 1
  var monthStr = if (fileMonth<=9) {
    monthStr ="0" + fileMonth.toString
  } else {
    monthStr = fileMonth.toString
  }

  //testing purpose
  monthStr = "11"
  year = 2016
  val monthlyEventDf = hiveContext.sql("select * from " + referenceDB + " ." + eventsourceTable + "where(unix_timestamp(event_Dt, "yyyy-mm"))")=unix_timestamp("' +year+ "-"+"monthstr"+"',+'yyyy-MM'))")
  val uniquedf = monthlyEventDf.repartition(col("event_Id")).withColumn("rank",rank().over(Window.partitionBy("event_Id").orderBy(desc("somevalue")))
  val monthlyEventfinal = monthlyEventDf.persist(StorageLevel.Memory_AND_DISK)

  return monthlyEventfinal
}

Where can we edit our requirement in current module Looking for suggestions

回答1:

You can use filter function to select records in range like below

//Input df

+---+----------+----------+
| cd|start_date|  end_date|
+---+----------+----------+
|  M|2018-01-01|2018-01-31|
|  D|2018-05-03|2018-05-03|
|  D|2018-03-27|2018-03-27|
+---+----------+----------+

//Parameter startDate and endDate
val endDate="2018-05-03"

val endDate="2018-05-03"

//Filter condition
df.filter(s"start_date>='$startDate' and end_date<='$endDate'").show

//Sample Output: 
+---+----------+----------+
| cd|start_date|  end_date|
+---+----------+----------+
|  D|2018-05-03|2018-05-03|
|  D|2018-03-27|2018-03-27|
+---+----------+----------+

I hope this will help you, If you want to do any calculation on filtered records then you have to pass columns to udf