I have run the following PySpark code:
from pyspark import SparkContext
sc = SparkContext()
data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.saveAsTextFile(
'gs://bucket-name/output_blob_path',
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)
Job finished successfully. However, during the job execution Spark created many temporary blobs in the following path gs://bucket-name/output_blob_path/_temporary/0/
. I realised that removing of all these temporary blobs at the end took half of the job execution time and CPU utilisation was on 1% during this time (huge waste of resources).
Is there a way to store temporary files on local drive (or HDFS) instead of GCP? I would still like to persist final results (sorted dataset) to GCP.
We were using Dataproc Spark cluster (VM type 16cores, 60GM) with 10 worker nodes. The volume of the input data was 10TB.
The _temporary
files you see are likely an artifact of the FileOutputCommitter being used under the hood. Importantly, these temporary blobs were not strictly "temporary" data, but were in fact completed output data which only gets "renamed" to the final destination on job completion. The "commit" of these files through rename is actually fast because both the source and destination are on GCS; for this reason there's no way to replace that part of the workflow with placing temporary files on HDFS and then "committing" into GCS, because then the commit would require re-plumbing the entire output dataset back out from HDFS into GCS. And specifically, the underlying Hadoop FileOutputFormat classes don't support such an idiom.
GCS itself is not a real filesystem, but is an "object store", and the GCS connector inside Dataproc only mimics HDFS to the best of its ability. One consequence is that the deletion of a directory fill of files actually requires GCS to delete individual objects under the hood, rather than a real filesystem just unlinking an inode.
In practice, if you're hitting this it probably means that your output is split into too many files anyways, since cleanup does occur in batches of ~1000 files at a time. So up to tens of thousands of output files usually shouldn't be noticeably slow. Having too many files would also make future work on those files slower. The easiest fix usually is just to reduce the number of output files whenever possible, for example using repartition()
:
from pyspark import SparkContext
sc = SparkContext()
data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.repartition(1000).saveAsTextFile(
'gs://bucket-name/output_blob_path',
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)
I have the same question as you before. My blog: spark speedup write file to cloud storage. Then I find this article Spark 2.0.0 Cluster Takes a Longer Time to Append Data.
If you find that a cluster using Spark 2.0.0 version takes a longer time to append data to an existing dataset and in particular, all of Spark jobs have finished, but your command has not finished, it is because driver node is moving the output files of tasks from the job temporary directory to the final destination one-by-one, which is slow with cloud storage. To resolve this issue, set mapreduce.fileoutputcommitter.algorithm.version to 2. Note that this issue does not affect overwriting a dataset or writing data to a new location.
This issue will be amplified in cloud environment, when we use GCS as tmp storage.
How to fix it?
You can simply add this parameter to fix this problem, which means you don't create tmp file while save file to GCS.
write.option("mapreduce.fileoutputcommitter.algorithm.version", "2")
Warnning!
DirectParquetOutputCommitter is removed from Spark 2.0 due to the chance of data loss.