I'm writing a Python application which slides a window over a sequence of values each with a timestamp. I want to apply a function to values in the sliding window in order to calculate a score from N latest values as shown in the figure. We already implemented that function using a Python library to make use of GPUs.
I found that Apache Spark 2.0 ships with Structured Streaming and it supports window operations on event time. If you want to read a finite sequence of records from a .csv file and want to count the records in such a sliding window, you can use the following code in PySpark:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd
spark = SparkSession \
.builder \
.master('local[*]') \
.getOrCreate()
schema = StructType() \
.add('ts', 'timestamp') \
.add('value', 'double') \
sqlContext = SQLContext(spark)
lines = sqlContext \
.readStream \
.format('csv') \
.schema(schema) \
.load(path='file:///'+getcwd()+'/csv')
windowedCount = lines.groupBy(
window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'})
query = windowedCount \
.writeStream \
.outputMode('complete') \
.format('console') \
.start()
query.awaitTermination()
However, I want to apply UDAFs other than predefined aggregation functions over sliding windows. According to https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg, the available aggregate functions are only avg, max, min, sum, and count.
It is not supported yet? If so, when will it be supported in PySpark?
https://stackoverflow.com/a/32750733/1564381 shows that one can define UserDefinedAggregateFunction in Java or Scala and then invoke it in PySpark. It seems interesting but I want to apply my own Python function over values in sliding windows. I want a purely Pythonic way.
p.s. let me know any frameworks in Python other than PySpark that can solve this sort of problems (applying UDAFs on a window sliding over stream).