Currently we are reading date using calendar instance for picking last one month record using sparksql. Now we need: In case of extra events being added to previous day we must also be able to manually insert summary start and end dates, in case we need manual re run of job for a previous time period: e.g: a manual re run table could look like this:
rprtng_period_type_cd summary_start_date summary_end_date summary_iv
M 2018-01-01 2018-01-31 2018-01
D 2018-03-05 2018-03-05 2018-03-05
D 2018-03-27 2018-03-27 2018-03-27
This should tell the job to calculate a monthly summary for Jan18 and two daily summaries, one for 05 march and one for 27 march
The job should take summary_start_date
summary_end_date
and ensure that only events with an event_dt
between those two dates are only included in calculations.
My current code snippet looks like:
def execute(): Dataframe = {
//log files
val hivecntxt = SparkContextLoader.hiveContext
val eventsourceTable= cusotmermetricConstants.source_table
// Calendar information
val abc = Calendar.getInstance
abc.add(Calendar.month, -1)
var month = abc.get(Calendar.MONTH)
var year = abc.get(Calendar.YEAR)
var fileMonth = month + 1
var monthStr = if (fileMonth<=9) {
monthStr ="0" + fileMonth.toString
} else {
monthStr = fileMonth.toString
}
//testing purpose
monthStr = "11"
year = 2016
val monthlyEventDf = hiveContext.sql("select * from " + referenceDB + " ." + eventsourceTable + "where(unix_timestamp(event_Dt, "yyyy-mm"))")=unix_timestamp("' +year+ "-"+"monthstr"+"',+'yyyy-MM'))")
val uniquedf = monthlyEventDf.repartition(col("event_Id")).withColumn("rank",rank().over(Window.partitionBy("event_Id").orderBy(desc("somevalue")))
val monthlyEventfinal = monthlyEventDf.persist(StorageLevel.Memory_AND_DISK)
return monthlyEventfinal
}
Where can we edit our requirement in current module Looking for suggestions