Exceeding `spark.driver.maxResultSize` without bri

2020-06-08 15:05发布

问题:

I have a Spark application that performs a large join

val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")

and then aggregates the resulting DataFrame down to one with maybe 13k rows. In the course of the join, the job fails with the following error message:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)

This was happening before without setting spark.driver.maxResultSize, and so I set spark.driver.maxResultSize=2G. Then, I made a slight change to the join condition, and the error resurfaces.

Edit: In resizing the cluster, I also doubled the number of partitions the DataFrame assumes in a .coalesce(256) to a .coalesce(512), so I can't be sure it's not because of that.

My question is, since I am not collecting anything to the driver, why should spark.driver.maxResultSize matter at all here? Is the driver's memory being used for something in the join that I'm not aware of?

回答1:

Just because you don't collect anything explicitly it doesn't mean that nothing is collected. Since the problem occurs during a join, the most likely explanation is that execution plan uses broadcast join. In that case Spark will collect data first, and then broadcast it.

Depending on the configuration and pipeline:

  • Make sure that spark.sql.autoBroadcastJoinThreshold is smaller than spark.driver.maxResultSize.
  • Make sure you don't force broadcast join on a data of unknown size.
  • While nothing indicates it is the problem here, be careful when using Spark ML utilities. Some of these (most notably indexers) can bring significant amounts of data to the driver.

To determine if broadcasting is indeed the problem please check the execution plan, and if needed, remove broadcast hints and disable automatic broadcasts:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)


回答2:

In theory, exception is not always related with customer data.

Technical information about tasks execution results send to Driver Node in serialized form, and this information can take more memory then threshold.

Prove: Error message located in org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults

val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +

Method called in org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask

        val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
        case directResult: DirectTaskResult[_] =>
          if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
            return
          }

If tasks number is huge, mentioned exception can occurs.