我写的火花/斯卡拉程序ZIP文件的读取,解压缩他们和内容写入了一组新的文件。 我能得到这个写入到本地文件系统的工作,但不知道是否有向其写入文件输出到分布式文件系统,如HDFS的方式。 代码显示below`
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._
var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) => {
val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = scala.io.Source.fromInputStream(zipStream).getLines
val fname = f"/d/tmp/myfile$i.txt"
i = i + 1
val xx = iter.mkString
val writer = new PrintWriter(new File(fname))
writer.write(xx)
writer.close()
iter
}).collect()
`
您可以使用Hadoop的公共库容易写数据到HDFS(如果您正在使用SBT作为依赖manangement工具,加thath库到你的依赖)。 这样,您可以创建一个文件系统对象:
private val fs = {
val conf = new Configuration()
FileSystem.get(conf)
}
一定要与你的Hadoop集群信息(核心-site.xml中,等)配置文件系统
然后,你可以写,例如一个字符串路径(在你的情况下,你应该处理数据流),在HDFS如下:
@throws[IOException]
def writeAsString(hdfsPath: String, content: String) {
val path: Path = new Path(hdfsPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
val dataOutputStream: FSDataOutputStream = fs.create(path)
val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))
bw.write(content)
bw.close
}
sc.binaryFiles("/user/example/zip_dir", 10) //make an RDD from *.zip files in HDFS
.flatMap((file: (String, PortableDataStream)) => { //flatmap to unzip each file
val zipStream = new ZipInputStream(file._2.open) //open a java.util.zip.ZipInputStream
val entry = zipStream.getNextEntry //get the first entry in the stream
val iter = Source.fromInputStream(zipStream).getLines //place entry lines into an iterator
iter.next //pop off the iterator's first line
iter //return the iterator
})
.saveAsTextFile("/user/example/quoteTable_csv/result.csv")
你应该看看从官方文档的方法saveAsTextFile: http://spark.apache.org/docs/latest/programming-guide.html
它可以让你保存到HDFS:
iter.saveAsTextFile("hdfs://...")
你可以试试saveAsTextFile方法。
写为文本文件的数据集的元素(或一组文本文件),在给定的目录中的本地文件系统,HDFS或任何其他的Hadoop支持的文件系统。 火花会调用每个元素的toString将其转换为文件中的一行文字。
这将每个分区保存为不同的文件,你将最终分区的数量将与您的输入文件的数量,除非你重新分割或合并。