I want to continuously elaborate rows of a dataset stream (originally initiated by a Kafka): based on a condition I want to update a Radis hash. This is my code snippet (lastContacts
is the result of a previous command, which is a stream of this type: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]
. This expands to org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
):
class MyStreamProcessor extends ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
}
override def close(errorOrNull: Throwable): Unit = {}
}
val query = lastContacts
.writeStream
.foreach(new MyStreamProcessor())
.start()
query.awaitTermination()
I receive a huge stack trace, which the relevant part (I think) is this: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
Could anyone explain why this exception occurs and how to avoid? Thank you!
This question is related to the following two:
- DataFrame to RDD[(String, String)] conversion
- Call a function with each element a stream in Databricks