我要不断地制定一个数据集流(最初由卡夫卡发起)的行:根据我想更新Radis哈希的条件。 这是我的代码段( lastContacts
是先前的命令,这是这种类型的流的结果: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]
这个扩展到。 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
class MyStreamProcessor extends ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
}
override def close(errorOrNull: Throwable): Unit = {}
}
val query = lastContacts
.writeStream
.foreach(new MyStreamProcessor())
.start()
query.awaitTermination()
我收到了巨大的堆栈跟踪,其中相关部分(我认为)是这样的: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
任何人都可以解释为什么会出现这种异常,以及如何避免? 谢谢!
这个问题涉及到以下两个:
- 数据帧到RDD [(字符串,字符串)]转换
- 调用与每个元件的功能在Databricks流