基于窗口的正确的方式来填补数据集(Correct way to fill dataset with

2019-09-28 03:48发布

我有一台机器的日志文件。 init启动新的执行。 负荷加载一个新的PARAMETERSET。 所以一般工作流程是init - >负载 - >初始化 - >负荷等。 在INIT被另一个跟随的情况下初始化由最后初始化加载的参数仍然有效。

我提供了一种简单化了示例输入。

val ds = spark.sparkContext.parallelize(Seq(
  ("2017-01-01 07:15:12", "start", "nice"),
  ("2017-01-01 07:22:12", "init", "init"),
  ("2017-01-01 07:31:12", "A_status", "meas" ),
  ("2017-01-01 10:30:12", "load", "mdCor 0x0 TST"),
  ("2017-01-01 10:45:12", "B_status", "meas"),
  ("2017-01-01 11:47:13", "init", "init"),
  ("2017-01-02 02:15:12", "C_Status", "meas"),
  ("2017-01-02 07:22:12", "init", "init"),
  ("2017-01-02 07:31:12", "A_status", "meas" ),
  ("2017-01-02 10:30:12", "load", "mdCor 0x5 TST"),
  ("2017-01-02 10:45:12", "B_status", "meas"),
  ("2017-01-02 11:47:13", "init", "init"),
  ("2017-01-03 02:15:12", "C_Status", "meas"),
  ("2017-01-03 10:30:12", "load", "mdCor 0x6 TST")

)).toDF("datetime", "status", "msg")

它看起来像这样

+-------------------+--------+-------------+
|           datetime|  status|          msg|
+-------------------+--------+-------------+
|2017-01-01 07:15:12|   start|         nice|
|2017-01-01 07:22:12|    init|         init|
|2017-01-01 07:31:12|A_status|         meas|
|2017-01-01 10:30:12|    load|mdCor 0x0 TST|
|2017-01-01 10:45:12|B_status|         meas|
|2017-01-01 11:47:13|    init|         init|
|2017-01-02 02:15:12|C_Status|         meas|
|2017-01-02 07:22:12|    init|         init|
|2017-01-02 07:31:12|A_status|         meas|
|2017-01-02 10:30:12|    load|mdCor 0x5 TST|
|2017-01-02 10:45:12|B_status|         meas|
|2017-01-02 11:47:13|    init|         init|
|2017-01-03 02:15:12|C_Status|         meas|
|2017-01-03 10:30:12|    load|mdCor 0x6 TST|
+-------------------+--------+-------------+

我的目标是将其改造成这样:

+-------------------+--------+-------------+----------+
|           datetime|  status|          msg|param_fill|
+-------------------+--------+-------------+----------+
|2017-01-01 07:15:12|   start|         nice|      null|
|2017-01-01 07:22:12|    init|         init|       0x0|
|2017-01-01 07:31:12|A_status|         meas|       0x0|
|2017-01-01 10:30:12|    load|mdCor 0x0 TST|       0x0|
|2017-01-01 10:45:12|B_status|         meas|       0x0|
|2017-01-01 11:47:13|    init|         init|       0x0|
|2017-01-02 02:15:12|C_Status|         meas|       0x0|
|2017-01-02 07:22:12|    init|         init|       0x5|
|2017-01-02 07:31:12|A_status|         meas|       0x5|
|2017-01-02 10:30:12|    load|mdCor 0x5 TST|       0x5|
|2017-01-02 10:45:12|B_status|         meas|       0x5|
|2017-01-02 11:47:13|    init|         init|       0x6|
|2017-01-03 02:15:12|C_Status|         meas|       0x6|
|2017-01-03 10:30:12|    load|mdCor 0x6 TST|       0x6|
+-------------------+--------+-------------+----------+

我已经得到下面的代码预期的结果。

//udf to return the parameter if msg contains TST
    val param = udf((msg: String) => if(msg.contains("TST")){msg.split(" ").takeRight(2).take(1)(0)} else{null})
    //set flag if value equals init
    val ds4 = ds.withColumn("initCounter", when($"status" === "init", 1).otherwise(0))
    //provide a window for later filling of parameters (same value for status init to next status init)
    val ds5 = ds4.withColumn("sum_counter", sum($"initCounter").over(Window.orderBy("datetime")))
    //enter parameter value if status equals load
    val ds6 = ds5.withColumn("param", when($"status" === "load",param($"msg")))
    //window for parameter lifetime per testcycle
    val initWindow = Window.partitionBy($"sum_counter").orderBy($"datetime").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    //fill parameter values per testcycle
    val ds7 = ds6.withColumn("param_fill", functions.last("param", true).over(initWindow))
    //window for entire table <-this is probably a bad idea
    val tableWindow = Window.rowsBetween(Window.unboundedPreceding, 0)
    //fill all null values with last valid parameter
    val ds8 = ds7.withColumn("param_fill1", last($"param_fill", true).over(tableWindow))
    //drop untwanted columns
    val ds9 = ds8.drop("initCounter").drop("sum_counter").drop("param")

前进中的表看起来像这样:(其中每个新列引用上面的代码的我一个步骤)

 +-------------------+--------+-------------+-----------+-----------+-----+----------+-----------+
|           datetime|  status|          msg|initCounter|sum_counter|param|param_fill|param_fill1|
+-------------------+--------+-------------+-----------+-----------+-----+----------+-----------+
|2017-01-01 07:15:12|   start|         nice|          0|          0| null|      null|       null|
|2017-01-01 07:22:12|    init|         init|          1|          1| null|       0x0|        0x0|
|2017-01-01 07:31:12|A_status|         meas|          0|          1| null|       0x0|        0x0|
|2017-01-01 10:30:12|    load|mdCor 0x0 TST|          0|          1|  0x0|       0x0|        0x0|
|2017-01-01 10:45:12|B_status|         meas|          0|          1| null|       0x0|        0x0|
|2017-01-01 11:47:13|    init|         init|          1|          2| null|      null|        0x0|
|2017-01-02 02:15:12|C_Status|         meas|          0|          2| null|      null|        0x0|
|2017-01-02 07:22:12|    init|         init|          1|          3| null|       0x5|        0x5|
|2017-01-02 07:31:12|A_status|         meas|          0|          3| null|       0x5|        0x5|
|2017-01-02 10:30:12|    load|mdCor 0x5 TST|          0|          3|  0x5|       0x5|        0x5|
|2017-01-02 10:45:12|B_status|         meas|          0|          3| null|       0x5|        0x5|
|2017-01-02 11:47:13|    init|         init|          1|          4| null|       0x6|        0x6|
|2017-01-03 02:15:12|C_Status|         meas|          0|          4| null|       0x6|        0x6|
|2017-01-03 10:30:12|    load|mdCor 0x6 TST|          0|          4|  0x6|       0x6|        0x6|
+-------------------+--------+-------------+-----------+-----------+-----+----------+-----------+

在我的脑海的问题是:
我可以指定窗口填写PARAM没有添加列(init_counter,sum_counter)?
是实现我的目标这种高效/正确的方法是什么? 是否有更简单的方法来做到这一点,而无需创建将在年底无论哪种方式被丢弃的列?

文章来源: Correct way to fill dataset with data based on window