Azure的DataBricks流的foreach失败NotSerializableExceptio

2019-10-31 09:10发布

我要不断地制定一个数据集流(最初由卡夫卡发起)的行:根据我想更新Radis哈希的条件。 这是我的代码段( lastContacts是先前的命令,这是这种类型的流的结果: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]这个扩展到。 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

class MyStreamProcessor extends ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
  }

  override def close(errorOrNull: Throwable): Unit = {}
}

val query = lastContacts
  .writeStream
  .foreach(new MyStreamProcessor())
  .start()

query.awaitTermination()

我收到了巨大的堆栈跟踪,其中相关部分(我认为)是这样的: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter

任何人都可以解释为什么会出现这种异常,以及如何避免? 谢谢!

这个问题涉及到以下两个:

  • 数据帧到RDD [(字符串,字符串)]转换
  • 调用与每个元件的功能在Databricks流

Answer 1:

星火上下文是不可序列。

ForeachWriter的任何实现都必须是可序列化,因为每个任务会得到所提供的对象的新序列化,反序列化副本。 因此,强烈建议开启(...)方法被调用后,将数据写入任何初始化(例如打开连接或启动事务)完成,这标志着该任务准备产生的数据。

在代码中,你要使用的工艺方法中的火花背景下,

override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    *sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
  }

为了将数据发送到Redis的,你需要创建自己的连接,并在打开的方法打开它,然后在这个过程中方法使用它。

看看如何创建的Redis连接池。 https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala



文章来源: Azure DataBricks Stream foreach fails with NotSerializableException