如何插入数据使用的foreach方法火花流结构蜂巢(how to insert data to HI

2019-10-29 07:02发布

我尝试插入数据使用的foreach方法HIVE表。

我使用的火花2.3.0。

这里是我的代码

   df_drop_window.writeStream
     .foreach(new ForeachWriter[Row]() {
       override def open(partitionId: Long, epochId: Long): Boolean = true
       override def process(value: Row): Unit = {
         println(s">> Processing ${value}")
         // how to onvert the value as dataframe ?
       }
       override def close(errorOrNull: Throwable): Unit = {
       }
     }).outputMode("update").start()

正如你可以在上面看到,我希望“值”转换为数据帧和数据插入到表HIVE像INSERT INTO表名(选择数据框中*)。 有人可以帮助该怎么办呢?是新来的火花流

我可以看到只有以下选项。 可有人说我怎么能转换值:行数据 框架?

我曾尝试以下,但我收到错误(org.apache.spark.SparkException:任务不序列化)

            df.writeStream
       .foreach(new ForeachWriter[Row]() {
       override def open(partitionId: Long, epochId: Long): Boolean = true
       override def process(value: Row): Unit = {
       val rowsRdd = sc.parallelize(Seq(value))
       val df2 = spark.createDataFrame(rowsRdd, schema)
       df2.createOrReplaceTempView("testing2")
       spark.sql("insert into table are.table_name1 Partition(date) select * from testing2")
       }
       override def close(errorOrNull: Throwable): Unit = {
       }
       }).outputMode("append").start()

Answer 1:

星火Session是不是在执行方面序列化,你需要广播火花会议



文章来源: how to insert data to HIVE using foreach method in spark structured streaming