I have a Spark application that performs a large join
val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
and then aggregates the resulting DataFrame down to one with maybe 13k rows. In the course of the join, the job fails with the following error message:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)
This was happening before without setting spark.driver.maxResultSize
, and so I set spark.driver.maxResultSize=2G
. Then, I made a slight change to the join condition, and the error resurfaces.
Edit: In resizing the cluster, I also doubled the number of partitions the DataFrame assumes in a .coalesce(256)
to a .coalesce(512)
, so I can't be sure it's not because of that.
My question is, since I am not collecting anything to the driver, why should spark.driver.maxResultSize
matter at all here? Is the driver's memory being used for something in the join that I'm not aware of?
In theory, exception is not always related with customer data.
Technical information about tasks execution results send to Driver Node in serialized form, and this information can take more memory then threshold.
Prove: Error message located in org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults
Method called in org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask
If tasks number is huge, mentioned exception can occurs.
Just because you don't collect anything explicitly it doesn't mean that nothing is collected. Since the problem occurs during a join, the most likely explanation is that execution plan uses broadcast join. In that case Spark will collect data first, and then broadcast it.
Depending on the configuration and pipeline:
spark.sql.autoBroadcastJoinThreshold
is smaller thanspark.driver.maxResultSize
.To determine if broadcasting is indeed the problem please check the execution plan, and if needed, remove broadcast hints and disable automatic broadcasts: