如何转换火花流输出到数据帧或在表中存储(How to convert spark streaming

2019-09-30 00:39发布

我的代码是:

val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()

我的输出的格式如下具有50个不同的值

{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}

谁能帮我在一个表格形式存储这些数据

| id |date               |temp|press|   
|st01|26-02-2018 20:30:40| 30 |20   |  
|st01|26-02-2018 20:30:45| 80 |70   |  

我会很感激。

Answer 1:

您可以用正常的数据集API使用foreachRDD功能,一起:

data.foreachRDD(rdd => {
    // rdd is RDD[String]
    // foreachRDD is executed on the  driver, so you can use SparkSession here; spark is SparkSession, for Spark 1.x use SQLContext
    val df = spark.read.json(rdd); // or sqlContext.read.json(rdd)
    df.show(); 
    df.write.saveAsTable("here some unique table ID");
});

但是,如果你使用的Spark 2.x中,我会建议使用结构化数据流:

val stream = spark.readStream.format("kafka").load()
val data = stream
            .selectExpr("cast(value as string) as value")
            .select(from_json(col("value"), schema))
data.writeStream.format("console").start();

您必须手动指定的模式,但它很简单:)另外进口org.apache.spark.sql.functions._任何处理之前



文章来源: How to convert spark streaming output into dataframe or storing in table