我有一个时间序列数据集,它是由一个id分配,并通过时间戳排序。 样品:
ID Timestamp Feature
"XSC" 1986-05-21 44.7530
"XSC" 1986-05-22 44.7530
"XSC" 1986-05-23 23.5678
"TM" 1982-03-08 22.2734
"TM" 1982-03-09 22.1941
"TM" 1982-03-10 22.0847
"TM" 1982-03-11 22.1741
"TM" 1982-03-12 22.1840
"TM" 1982-03-15 22.1344
我有我需要计算一些自定义的逻辑,应该每个窗口中进行,在每个分区内。 我知道星火有窗口的功能,我想使用用于此目的的丰富的支持。
我的逻辑需要在当前窗口/分区元件的总数,作为标量。 我需要做一些特定的计算(基本上,一个for循环到那个数)。
我曾尝试添加一个数列,通过做
val window = Window.partitionBy("id").orderBy("timestamp")
frame = frame.withColumn("my_cnt", count(column).over(window))
我需要做的是这样的:
var i = 1
var y = col("Feature")
var result = y
while (i < /* total number of records within each partition goes here */) {
result = result + lit(1) * lag(y, i).over(window) + /* complex computation */
i = i + 1
}
dataFrame.withColumn("Computed_Value", result)
我怎样才能获得的每个分区中记录该总数作为标量值? 我也有补充说,数“my_cnt”值,它增加了总价值为分区,但似乎没有能够在我的情况下使用它。
该collect_list
星火功能允许您汇总窗值的列表。 这个列表可以传递给udf
做一些复杂的计算
所以,如果你有源
val data = List(
("XSC", "1986-05-21", 44.7530),
("XSC", "1986-05-22", 44.7530),
("XSC", "1986-05-23", 23.5678),
("TM", "1982-03-08", 22.2734),
("TM", "1982-03-09", 22.1941),
("TM", "1982-03-10", 22.0847),
("TM", "1982-03-11", 22.1741),
("TM", "1982-03-12", 22.1840),
("TM", "1982-03-15", 22.1344),
).toDF("id", "timestamp", "feature")
.withColumn("timestamp", to_date('timestamp))
而一些复杂的功能,裹在你的记录中的UDF(表示为例如元组)
val complexComputationUDF = udf((list: Seq[Row]) => {
list
.map(row => (row.getString(0), row.getDate(1).getTime, row.getDouble(2)))
.sortBy(-_._2)
.foldLeft(0.0) {
case (acc, (id, timestamp, feature)) => acc + feature
}
})
您可以定义要么通过所有分区的数据,每条记录,或者在有序窗口的情况下,增量数据,每条记录窗口
val windowAll = Window.partitionBy("id")
val windowRunning = Window.partitionBy("id").orderBy("timestamp")
并把它一起在一个新的数据集,如:
val newData = data
// I assuming thatyou need id,timestamp & feature for the complex computattion. So I create a struct
.withColumn("record", struct('id, 'timestamp, 'feature))
// Collect all records in the partition as a list of tuples and pass them to the complexComupation
.withColumn("computedValueAll",
complexComupationUDF(collect_list('record).over(windowAll)))
// Collect records in a time ordered windows in the partition as a list of tuples and pass them to the complexComupation
.withColumn("computedValueRunning",
complexComupationUDF(collect_list('record).over(windowRunning)))
这将导致类似:
+---+----------+-------+--------------------------+------------------+--------------------+
|id |timestamp |feature|record |computedValueAll |computedValueRunning|
+---+----------+-------+--------------------------+------------------+--------------------+
|XSC|1986-05-21|44.753 |[XSC, 1986-05-21, 44.753] |113.07379999999999|44.753 |
|XSC|1986-05-22|44.753 |[XSC, 1986-05-22, 44.753] |113.07379999999999|89.506 |
|XSC|1986-05-23|23.5678|[XSC, 1986-05-23, 23.5678]|113.07379999999999|113.07379999999999 |
|TM |1982-03-08|22.2734|[TM, 1982-03-08, 22.2734] |133.0447 |22.2734 |
|TM |1982-03-09|22.1941|[TM, 1982-03-09, 22.1941] |133.0447 |44.4675 |
|TM |1982-03-10|22.0847|[TM, 1982-03-10, 22.0847] |133.0447 |66.5522 |
|TM |1982-03-11|22.1741|[TM, 1982-03-11, 22.1741] |133.0447 |88.7263 |
|TM |1982-03-12|22.184 |[TM, 1982-03-12, 22.184] |133.0447 |110.91029999999999 |
|TM |1982-03-15|22.1344|[TM, 1982-03-15, 22.1344] |133.0447 |133.0447 |
+---+----------+-------+--------------------------+------------------+--------------------+
文章来源: Spark window custom function - getting the total number of partition records