Exceeding `spark.driver.maxResultSize` without bringing any data to the driver
Asked Answered
R

2

28

I have a Spark application that performs a large join

val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")

and then aggregates the resulting DataFrame down to one with maybe 13k rows. In the course of the join, the job fails with the following error message:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)

This was happening before without setting spark.driver.maxResultSize, and so I set spark.driver.maxResultSize=2G. Then, I made a slight change to the join condition, and the error resurfaces.

Edit: In resizing the cluster, I also doubled the number of partitions the DataFrame assumes in a .coalesce(256) to a .coalesce(512), so I can't be sure it's not because of that.

My question is, since I am not collecting anything to the driver, why should spark.driver.maxResultSize matter at all here? Is the driver's memory being used for something in the join that I'm not aware of?

Randee answered 13/3, 2017 at 22:49 Comment(9)
Have same issue, do you have any progress here?Stoughton
@ValentinP. For my job, I simply increased the option to 3G and it worked. This still doesn't answer the question of what this parameter does and why it's necessary when --deploy-mode client is the setup.Randee
@Randee Can you paste the actual scala code whiich you are running? val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date") line won't run any job. You must be doing some transformation which triggers the job.Trysail
I've had this issue after doing a df.write.csv(...).Terrarium
Could you check how many partitions do you have in joined? Something like joined.queryExecution.toRdd.getNumPartitions. I'm curious why you had 78021 tasks. Could be that a better solution is to lower the number of partitions for the datasets in join?Champagne
Could you also attach the physical query plan (from web UI or df.explain)?Champagne
@JacekLaskowski Unfortunately, I don't have the code for this project anymore, and it's been so long that I forgot most of what it was about anyway. Sorry, but thanks for the renewed interest in the question.Randee
@Terrarium Could you shed more light on your case that seems similar? Could you edit the question and throw in more info? Thanks.Champagne
@JacekLaskowski I can't show the query plan here, but the stage where it crashes consists of +3000 tasks. It's a lot of FileScanRDD followed by MapPartitionsRDD. Then a lot of UnionRDD. With finally a distinct operation on the result of all the unions. But no (broadcast) joins or collect... I can of course see why this execution plan is not ideal, but not where spark.driver.maxResultSize comes in. When --deploy-mode cluster is set there is no crash.Terrarium
P
23

Just because you don't collect anything explicitly it doesn't mean that nothing is collected. Since the problem occurs during a join, the most likely explanation is that execution plan uses broadcast join. In that case Spark will collect data first, and then broadcast it.

Depending on the configuration and pipeline:

  • Make sure that spark.sql.autoBroadcastJoinThreshold is smaller than spark.driver.maxResultSize.
  • Make sure you don't force broadcast join on a data of unknown size.
  • While nothing indicates it is the problem here, be careful when using Spark ML utilities. Some of these (most notably indexers) can bring significant amounts of data to the driver.

To determine if broadcasting is indeed the problem please check the execution plan, and if needed, remove broadcast hints and disable automatic broadcasts:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
Pachydermatous answered 13/3, 2017 at 22:49 Comment(3)
I suppose Spark should be smart enough not to use a broadcast join that exceeds maxResultSize unless you explicitly tell it to or unless you stupidly change autoBroadcastJoinThreshold to a higher value?Terrarium
How do you disable automatic broadcasts?Whoops
@Whoops setting spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) disables automatic broadcastsCharmain
G
4

In theory, exception is not always related with customer data.

Technical information about tasks execution results send to Driver Node in serialized form, and this information can take more memory then threshold.

Prove: Error message located in org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults

val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +

Method called in org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask

        val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
        case directResult: DirectTaskResult[_] =>
          if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
            return
          }

If tasks number is huge, mentioned exception can occurs.

Georginegeorglana answered 18/9, 2018 at 15:30 Comment(2)
I'm pretty sure you are on to something, but could you expand on what that technical information is and how that piece of code proves it? I am not extremely familiar with Spark's internals, so to me it looks like the purpose of TaskResultGetter is returning the actual results of the computations that a task performed to the driver. Which should only be necessary for actions like collect, take, perhaps count, ...Terrarium
how to check: just run small join in debug in local mode, set break point on specified place, and some technical information appeared (at least some accumulators)Georginegeorglana

© 2022 - 2024 — McMap. All rights reserved.