I have a Spark application that performs a large join
val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
and then aggregates the resulting DataFrame down to one with maybe 13k rows. In the course of the join, the job fails with the following error message:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)
This was happening before without setting spark.driver.maxResultSize
, and so I set spark.driver.maxResultSize=2G
. Then, I made a slight change to the join condition, and the error resurfaces.
Edit: In resizing the cluster, I also doubled the number of partitions the DataFrame assumes in a .coalesce(256)
to a .coalesce(512)
, so I can't be sure it's not because of that.
My question is, since I am not collecting anything to the driver, why should spark.driver.maxResultSize
matter at all here? Is the driver's memory being used for something in the join that I'm not aware of?
Just because you don't collect anything explicitly it doesn't mean that nothing is collected. Since the problem occurs during a join, the most likely explanation is that execution plan uses broadcast join. In that case Spark will collect data first, and then broadcast it.
Depending on the configuration and pipeline:
- Make sure that
spark.sql.autoBroadcastJoinThreshold
is smaller than spark.driver.maxResultSize
.
- Make sure you don't force broadcast join on a data of unknown size.
- While nothing indicates it is the problem here, be careful when using Spark ML utilities. Some of these (most notably indexers) can bring significant amounts of data to the driver.
To determine if broadcasting is indeed the problem please check the execution plan, and if needed, remove broadcast hints and disable automatic broadcasts:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
In theory, exception is not always related with customer data.
Technical information about tasks execution results send to Driver Node in serialized form, and this information can take more memory then threshold.
Prove:
Error message located in org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
Method called in org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
If tasks number is huge, mentioned exception can occurs.