I have a large (about 85 GB compressed) gzipped file from s3 that I am trying to process with Spark on AWS EMR (right now with an m4.xlarge master instance and two m4.10xlarge core instances each with a 100 GB EBS volume). I am aware that gzip is a non-splittable file format, and I've seen it suggested that one should repartition the compressed file because Spark initially gives an RDD with one partition. However, after doing
scala> val raw = spark.read.format("com.databricks.spark.csv").
| options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
| load("s3://path/to/file.gz").
| repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()
and taking a look at the Spark application UI, I still see only one active executor (the other 14 are dead) with one task, and the job never finishes (or at least I've not waited long enough for it to).
- What is going on here? Can someone help me understand how Spark is working in this example?
- Should I be using a different cluster configuration?
- Unfortunately, I have no control over the mode of compression, but is there an alternative way of dealing with such a file?
If the file format is not splittable, then there's no way to avoid reading the file in its entirety on one core. In order to parallelize work, you have to know how to assign chunks of work to different computers. In the gzip case, suppose you divide it up into 128M chunks. The nth chunk depends on the n-1-th chunk's position information to know how to decompress, which depends on the n-2-nd chunk, and so on down to the first.
If you want to parallelize, you need to make this file splittable. One way is to unzip it and process it uncompressed, or you can unzip it, split it into several files (one file for each parallel task you want), and gzip each file.
I have faced this problem and here is the solution.
Best way to approach this problem is to unzip the .gz file before our Spark batch run. Then use this unzip file, after that we can use Spark parallelism.
Code to unzip the .gz file.
import gzip
import shutil
with open('file.txt.gz', 'rb') as f_in, gzip.open('file.txt', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
Spark can parallelize reading a single gzip file.
The best you can do split it in chunks that are gzipped.
However, Spark is really slow at reading gzip files. You can do this to speed it up:
file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())
Going through Python is twice has fast as reading the native Spark gzip reader.