How to write Spark Streaming output to HDFS withou

2019-02-26 20:00发布

After some processing I have a DStream[String , ArrayList[String]] , so when I am writing it to hdfs using saveAsTextFile and after every batch it overwrites the data , so how to write new result by appending to previous results

output.foreachRDD(r => {
  r.saveAsTextFile(path)
})

Edit :: If anyone could help me in converting the output to avro format and then writing to HDFS with appending

2条回答
趁早两清
2楼-- · 2019-02-26 20:13

saveAsTextFile does not support append. If called with a fixed filename, it will overwrite it every time. We could do saveAsTextFile(path+timestamp) to save to a new file every time. That's the basic functionality of DStream.saveAsTextFiles(path)

An easily accessible format that supports append is Parquet. We first transform our data RDD to a DataFrame or Dataset and then we can benefit from the write support offered on top of that abstraction.

case class DataStructure(field1,..., fieldn)

... streaming setup, dstream declaration, ...

val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd => 
  import sparkSession.implicits._
  val df = rdd.toDF()
  df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")

})

Note that appending to Parquet files gets more expensive over time, so rotating the target file from time to time is still a requirement.

查看更多
一纸荒年 Trace。
3楼-- · 2019-02-26 20:31

If you want to append the same file and store on file system, store it as a parquet file. You can do it by

  kafkaData.foreachRDD( rdd => {
  if(rdd.count()>0)
  {
    val df=rdd.toDF()
    df.write(SaveMode.Append).save("/path")
   }
查看更多
登录 后发表回答