I have read about Spark's support for gzip-kind input files here, and I wonder if the same support exists for different kind of compressed files, such as .zip files. So far I have tried computing a file compressed under a zip file, but Spark seems unable to read its contents successfully.
I have taken a look to Hadoop's newAPIHadoopFile
and newAPIHadoopRDD
, but so far I have not been able to get anything working.
In addition, Spark supports creating a partition for every file under a specified folder, like in the example below:
SparkConf SpkCnf = new SparkConf().setAppName("SparkApp")
.setMaster("local[4]");
JavaSparkContext Ctx = new JavaSparkContext(SpkCnf);
JavaRDD<String> FirstRDD = Ctx.textFile("C:\input\).cache();
Where C:\input\
points to a directory with multiple files.
In the case computing zipped files would be possible, would it also be possible to pack every file under a single compressed file and follow the same pattern of one partition per file?
Since Apache Spark uses Hadoop Input formats we can look at the hadoop documentation on how to process zip files and see if there is something that works.
This site gives us an idea of how to use this (namely we can use the ZipFileInputFormat). That being said, since zip files are not split-table (see this) your request to have a single compressed file isn't really well supported. Instead, if possible, it would be better to have a directory containing many separate zip files.
This question is similar to this other question, however it adds an additional question of if it would be possible to have a single zip file (which, since it isn't a split-table format isn't a good idea).
Below is an example which searches a directory for .zip files and create an RDD using a custom FileInputFormat called
ZipFileInputFormat
and the newAPIHadoopFile API on the Spark Context. It then writes those files to an output directory.https://github.com/alvinhenrick/apache-spark-examples/blob/master/src/main/scala/com/zip/example/Unzip.scala
The ZipFileInputFormat used in the example can be found here: https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop
Spark default support compressed files
According to Spark Programming Guide
This could be expanded by providing information about what compression formats are supported by Hadoop, which basically can be checked by finding all classes extending
CompressionCodec
(docs)Source : List the available hadoop codecs
So the above formats and much more possibilities could be achieved simply by calling:
Reading zip files in Spark
Unfortunately,
zip
is not on the supported list by default.I have found a great article: Hadoop: Processing ZIP files in Map/Reduce and some answers (example) explaining how to use imported
ZipFileInputFormat
together withsc.newAPIHadoopFile
API. But this did not work for me.My solution
Without any external dependencies, you can load your file with
sc.binaryFiles
and later on decompress thePortableDataStream
reading the content. This is the approach I have chosen.using this implicit class, you need to import it and call the
readFile
method onSparkContext
:And the implicit class will load your
zip
file properly and returnRDD[String]
like it used to.Note: This only works for single file in the zip archive!
For multiple files in your zip support, check this answer: https://stackoverflow.com/a/45958458/1549135
You can use sc.binaryFiles to open the zip file in binary format, then unzip it into the text format. Unfortunately, the zip file is not split-able.. So you need to wait for the decompression, then maybe call shuffle to balance the data in each partition.
Here is an example in Python. More info is in http://gregwiki.duckdns.org/index.php/2016/04/11/read-zip-file-in-spark/
You can use sc.binaryFiles to read Zip as binary file
And then you can map the ZipInputStream to list of lines:
But the problem remains that the zip file is not splittable.