I am trying to parse about 1 million HTML files using PySpark (Google Dataproc) and write the relevant fields out to a condensed file. Each HTML file is about 200KB. Hence, all the data is about 200GB.
The code below works fine if I use a subset of the data, but runs for hours and then crashes when running on the whole dataset. Furthermore, the worker nodes are not utilized (<5% CPU) so I know there is some issue.
I believe the system is choking on ingesting the data from GCS. Is there a better way to do this? Also, when I use wholeTextFiles in this fashion, does the master attempt to download all the files and then send them to the executors, or does it let the executors download them?
def my_func(keyval):
keyval = (file_name, file_str)
return parser(file_str).__dict__
data = sc.wholeTextFiles("gs://data/*")
output = data.map(my_func)
output.saveAsTextFile("gs://results/a")
Thanks! I tried the first method. It works, but is not very performant due to the exec calls and RPC/auth overhead. It takes about 10 hours to run on a 32 node cluster. I was able to run it in 30 minutes on a 4-node cluster using databricks on aws with the Amazon s3 connector. It seems there is much less overhead there. I wish Google would provide a better way to ingest data from GCS to Spark.
To answer your question the master won't read all of the contained data, but it will fetch status for all input files before beginning work. Dataproc sets the property "mapreduce.input.fileinputformat.list-status.num-threads" to 20 by default to help improve the time of this lookup, but an RPC is still performed per file in GCS.
It seems you've found a case where even adding threads isn't helping very much and is just leading the driver to OOM faster.
Expanding on how to parallelize the read, I have two ideas.
But first, a bit of a warning: neither of these solutions as they are are very robust to directories being included in the glob. You will probably want to guard against directories appearing in the list of files to read.
The first is done with python and the hadoop command line tools (this could also be done with gsutil). The below is an example of how it might look and performs a file listing on workers, reads file content into pairs and finally computes pairs of (file name, file length):
I would first start with this subprocess solution and play with the partitioning of hadoop_ls and hadoop_cat calls and see if you can get something that is acceptable.
The second solution is more complicated, but will probably yield a pipeline that is more performant by avoiding many, many exec calls.
In this second solution, we'll be compiling a special purpose helper jar, using an initialization action to copy that jar to all workers and finally making use of the helper from our driver.
The final directory structure of our the scala jar project will look something like this:
In our PysparkHelper.scala file we will have a small scala class that functions much as our pure python solution above does. First we will create an RDD of file globs, then an RDD of file names and finally an RDD of file name and file content pairs.
The helper/build.sbt file would look something like this:
We can then build the helper with sbt:
The output helper jar should be target/scala-2.10/pyspark_support_2.10-0.1.jar
We now need to get this jar onto our cluster and to do this, we need to do two things: 1) upload the jar to GCS and 2) create an initialization action in GCS to copy the jar to cluster nodes.
For purposes of illustration, let's assume your bucket is named MY_BUCKET (insert appropriate walrus-related meme here).
Create an initialization action (let's call it pyspark_init_action.sh, replacing MY_BUCKET as needed):
and finally upload the initialization action to GCS:
A cluster can now be started by passing the following flags to gcloud:
After building, uploading, and installing our new library we can finally make use of it from pyspark: