在星火写入HDFS /斯卡拉阅读的zip文件(Writing to HDFS in Spark/Sc

2019-09-29 20:36发布

我写的火花/斯卡拉程序ZIP文件的读取,解压缩他们和内容写入了一组新的文件。 我能得到这个写入到本地文件系统的工作,但不知道是否有向其写入文件输出到分布式文件系统,如HDFS的方式。 代码显示below`

import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._

var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) => {   
   val zipStream = new ZipInputStream(file._2.open)            
   val entry = zipStream.getNextEntry                            
   val iter = scala.io.Source.fromInputStream(zipStream).getLines          
   val fname = f"/d/tmp/myfile$i.txt" 

   i = i + 1

   val xx = iter.mkString
   val writer = new PrintWriter(new File(fname))
   writer.write(xx)
   writer.close()

   iter                                                       
}).collect()

`

Answer 1:

您可以使用Hadoop的公共库容易写数据到HDFS(如果您正在使用SBT作为依赖manangement工具,加thath库到你的依赖)。 这样,您可以创建一个文件系统对象:

 private val fs = {
    val conf = new Configuration()
    FileSystem.get(conf)
  }

一定要与你的Hadoop集群信息(核心-site.xml中,等)配置文件系统

然后,你可以写,例如一个字符串路径(在你的情况下,你应该处理数据流),在HDFS如下:

@throws[IOException]
  def writeAsString(hdfsPath: String, content: String) {
    val path: Path = new Path(hdfsPath)
    if (fs.exists(path)) {
      fs.delete(path, true)
    }
    val dataOutputStream: FSDataOutputStream = fs.create(path)
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))
    bw.write(content)
    bw.close
  }


Answer 2:

sc.binaryFiles("/user/example/zip_dir", 10)                   //make an RDD from *.zip files in HDFS
    .flatMap((file: (String, PortableDataStream)) => {        //flatmap to unzip each file
        val zipStream = new ZipInputStream(file._2.open)      //open a java.util.zip.ZipInputStream
        val entry = zipStream.getNextEntry                    //get the first entry in the stream
        val iter = Source.fromInputStream(zipStream).getLines //place entry lines into an iterator
        iter.next                                             //pop off the iterator's first line
        iter                                                  //return the iterator
    })
    .saveAsTextFile("/user/example/quoteTable_csv/result.csv")


Answer 3:

你应该看看从官方文档的方法saveAsTextFile: http://spark.apache.org/docs/latest/programming-guide.html

它可以让你保存到HDFS:

iter.saveAsTextFile("hdfs://...")


Answer 4:

你可以试试saveAsTextFile方法。

写为文本文件的数据集的元素(或一组文本文件),在给定的目录中的本地文件系统,HDFS或任何其他的Hadoop支持的文件系统。 火花会调用每个元素的toString将其转换为文件中的一行文字。

这将每个分区保存为不同的文件,你将最终分区的数量将与您的输入文件的数量,除非你重新分割或合并。



文章来源: Writing to HDFS in Spark/Scala reading the zip files