我有一台机器的日志文件。 init启动新的执行。 负荷加载一个新的PARAMETERSET。 所以一般工作流程是init - >负载 - >初始化 - >负荷等。 在INIT被另一个跟随的情况下初始化由最后初始化加载的参数仍然有效。
我提供了一种简单化了示例输入。
val ds = spark.sparkContext.parallelize(Seq(
("2017-01-01 07:15:12", "start", "nice"),
("2017-01-01 07:22:12", "init", "init"),
("2017-01-01 07:31:12", "A_status", "meas" ),
("2017-01-01 10:30:12", "load", "mdCor 0x0 TST"),
("2017-01-01 10:45:12", "B_status", "meas"),
("2017-01-01 11:47:13", "init", "init"),
("2017-01-02 02:15:12", "C_Status", "meas"),
("2017-01-02 07:22:12", "init", "init"),
("2017-01-02 07:31:12", "A_status", "meas" ),
("2017-01-02 10:30:12", "load", "mdCor 0x5 TST"),
("2017-01-02 10:45:12", "B_status", "meas"),
("2017-01-02 11:47:13", "init", "init"),
("2017-01-03 02:15:12", "C_Status", "meas"),
("2017-01-03 10:30:12", "load", "mdCor 0x6 TST")
)).toDF("datetime", "status", "msg")
它看起来像这样
+-------------------+--------+-------------+
| datetime| status| msg|
+-------------------+--------+-------------+
|2017-01-01 07:15:12| start| nice|
|2017-01-01 07:22:12| init| init|
|2017-01-01 07:31:12|A_status| meas|
|2017-01-01 10:30:12| load|mdCor 0x0 TST|
|2017-01-01 10:45:12|B_status| meas|
|2017-01-01 11:47:13| init| init|
|2017-01-02 02:15:12|C_Status| meas|
|2017-01-02 07:22:12| init| init|
|2017-01-02 07:31:12|A_status| meas|
|2017-01-02 10:30:12| load|mdCor 0x5 TST|
|2017-01-02 10:45:12|B_status| meas|
|2017-01-02 11:47:13| init| init|
|2017-01-03 02:15:12|C_Status| meas|
|2017-01-03 10:30:12| load|mdCor 0x6 TST|
+-------------------+--------+-------------+
我的目标是将其改造成这样:
+-------------------+--------+-------------+----------+
| datetime| status| msg|param_fill|
+-------------------+--------+-------------+----------+
|2017-01-01 07:15:12| start| nice| null|
|2017-01-01 07:22:12| init| init| 0x0|
|2017-01-01 07:31:12|A_status| meas| 0x0|
|2017-01-01 10:30:12| load|mdCor 0x0 TST| 0x0|
|2017-01-01 10:45:12|B_status| meas| 0x0|
|2017-01-01 11:47:13| init| init| 0x0|
|2017-01-02 02:15:12|C_Status| meas| 0x0|
|2017-01-02 07:22:12| init| init| 0x5|
|2017-01-02 07:31:12|A_status| meas| 0x5|
|2017-01-02 10:30:12| load|mdCor 0x5 TST| 0x5|
|2017-01-02 10:45:12|B_status| meas| 0x5|
|2017-01-02 11:47:13| init| init| 0x6|
|2017-01-03 02:15:12|C_Status| meas| 0x6|
|2017-01-03 10:30:12| load|mdCor 0x6 TST| 0x6|
+-------------------+--------+-------------+----------+
我已经得到下面的代码预期的结果。
//udf to return the parameter if msg contains TST
val param = udf((msg: String) => if(msg.contains("TST")){msg.split(" ").takeRight(2).take(1)(0)} else{null})
//set flag if value equals init
val ds4 = ds.withColumn("initCounter", when($"status" === "init", 1).otherwise(0))
//provide a window for later filling of parameters (same value for status init to next status init)
val ds5 = ds4.withColumn("sum_counter", sum($"initCounter").over(Window.orderBy("datetime")))
//enter parameter value if status equals load
val ds6 = ds5.withColumn("param", when($"status" === "load",param($"msg")))
//window for parameter lifetime per testcycle
val initWindow = Window.partitionBy($"sum_counter").orderBy($"datetime").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
//fill parameter values per testcycle
val ds7 = ds6.withColumn("param_fill", functions.last("param", true).over(initWindow))
//window for entire table <-this is probably a bad idea
val tableWindow = Window.rowsBetween(Window.unboundedPreceding, 0)
//fill all null values with last valid parameter
val ds8 = ds7.withColumn("param_fill1", last($"param_fill", true).over(tableWindow))
//drop untwanted columns
val ds9 = ds8.drop("initCounter").drop("sum_counter").drop("param")
前进中的表看起来像这样:(其中每个新列引用上面的代码的我一个步骤)
+-------------------+--------+-------------+-----------+-----------+-----+----------+-----------+
| datetime| status| msg|initCounter|sum_counter|param|param_fill|param_fill1|
+-------------------+--------+-------------+-----------+-----------+-----+----------+-----------+
|2017-01-01 07:15:12| start| nice| 0| 0| null| null| null|
|2017-01-01 07:22:12| init| init| 1| 1| null| 0x0| 0x0|
|2017-01-01 07:31:12|A_status| meas| 0| 1| null| 0x0| 0x0|
|2017-01-01 10:30:12| load|mdCor 0x0 TST| 0| 1| 0x0| 0x0| 0x0|
|2017-01-01 10:45:12|B_status| meas| 0| 1| null| 0x0| 0x0|
|2017-01-01 11:47:13| init| init| 1| 2| null| null| 0x0|
|2017-01-02 02:15:12|C_Status| meas| 0| 2| null| null| 0x0|
|2017-01-02 07:22:12| init| init| 1| 3| null| 0x5| 0x5|
|2017-01-02 07:31:12|A_status| meas| 0| 3| null| 0x5| 0x5|
|2017-01-02 10:30:12| load|mdCor 0x5 TST| 0| 3| 0x5| 0x5| 0x5|
|2017-01-02 10:45:12|B_status| meas| 0| 3| null| 0x5| 0x5|
|2017-01-02 11:47:13| init| init| 1| 4| null| 0x6| 0x6|
|2017-01-03 02:15:12|C_Status| meas| 0| 4| null| 0x6| 0x6|
|2017-01-03 10:30:12| load|mdCor 0x6 TST| 0| 4| 0x6| 0x6| 0x6|
+-------------------+--------+-------------+-----------+-----------+-----+----------+-----------+
在我的脑海的问题是:
我可以指定窗口填写PARAM没有添加列(init_counter,sum_counter)?
是实现我的目标这种高效/正确的方法是什么? 是否有更简单的方法来做到这一点,而无需创建将在年底无论哪种方式被丢弃的列?