我尝试插入数据使用的foreach方法HIVE表。
我使用的火花2.3.0。
这里是我的代码
df_drop_window.writeStream
.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(value: Row): Unit = {
println(s">> Processing ${value}")
// how to onvert the value as dataframe ?
}
override def close(errorOrNull: Throwable): Unit = {
}
}).outputMode("update").start()
正如你可以在上面看到,我希望“值”转换为数据帧和数据插入到表HIVE像INSERT INTO表名(选择数据框中*)。 有人可以帮助该怎么办呢?是新来的火花流
我可以看到只有以下选项。 可有人说我怎么能转换值:行数据 框架?
我曾尝试以下,但我收到错误(org.apache.spark.SparkException:任务不序列化)
df.writeStream
.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(value: Row): Unit = {
val rowsRdd = sc.parallelize(Seq(value))
val df2 = spark.createDataFrame(rowsRdd, schema)
df2.createOrReplaceTempView("testing2")
spark.sql("insert into table are.table_name1 Partition(date) select * from testing2")
}
override def close(errorOrNull: Throwable): Unit = {
}
}).outputMode("append").start()