I'm trying to read in data from a Spark streaming data source, window it by event time, and then run a custom Python function over the windowed data (it uses non-standard Python libraries).
My data frame looks something like this:
| Time | Value |
| 2018-01-01 12:23:50.200 | 1234 |
| 2018-01-01 12:23:51.200 | 33 |
| 2018-01-01 12:23:53.200 | 998 |
| ... | ... |
The windowing seems to work nicely with Spark SQL, using something like this:
windowed_df = df.groupBy(window("Time", "10 seconds"))
..., and there is a section on windowing by event time in the Spark Structured Streaming docs so I think that should work fine with Spark Structured Streaming.
So far, so good.
Separately, I've been able to use Spark Streaming (DStream) to apply my custom transformation operation, which currently operates on an incoming stream (basically, it assumes the data comes in correctly windowed chunks, an assumption I'm trying to get rid of). The code looks something like this:
def my_analysis(input_rdd):
# convert RDD to native types (would also be possible from a DataFrame)
# run through various Python libs
# construct new RDD with results - 1 row, multiple values (could construct new DataFrame here instead)
my_dstream\
.map(deserialize_from_string)\
.transform(my_analysis)\
.map(serialize_to_string)\
.foreachRDD(write_to_sink)
I'd essentially now want to combine the two, so do something like:
df\
.groupBy(window("Time", "10 seconds"))\
.transform(my_analysis)\ # how do I do this with pyspark.sql.group.GroupedData?
.writeStream # ...
# OR:
my_dstream\
.map(deserialize_from_string)\
.window_by_event_time("10 seconds")\ # how do I do this with a DStream?
.transform(my_analysis)\
.map(serialize_to_string)\
.foreachRDD(write_to_sink)
Any idea how I might be able to accomplish the above?
Things I've tried:
- The functions I can run on windowed_df seem very limited, basically IPython suggests I can only do aggregations here (
min
/max
/avg
/agg
with pyspark.sql.functions).agg
seems most useful, but the best I've found so far in that area is usingcollect_list
, something like this:
windowed_df.agg(collect_list("Value")).sort("window").show(20, False)
... but that means I lose the timestamps.
- Custom aggregation functions (UDAF) are not supported in PySpark (SPARK-10915)
Other things I've looked at:
- Arbitrary Stateful Processing in Apache Spark’s Structured Streaming - mapGroupWithState sounds like it might do what I want (and more), but isn't available in PySpark yet.
- Spark: How to map Python with Scala or Java User Defined Functions? - in this case, writing the UADF in Scala/Java is not an option (I need to use a specific Python library)
- How to define UDAF over event-time windows in PySpark 2.1.0 - similar, but no answers
- Introducing Vectorized UDFs for PySpark - this might work and the "Ordinary Least Squares Linear Regression" example using "Grouped" UDFs look promising. However, it requires Spark 2.3.0 (I can compile that), and the JIRA ticket says UADFs are explicitly a non-goal (it's not clear to me how exactly UDAFs and GUDF(?)s differ)