I have a Spark application that performs a large join
val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
and then aggregates the resulting DataFrame down to one with maybe 13k rows. In the course of the join, the job fails with the following error message:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)
This was happening before without setting spark.driver.maxResultSize
, and so I set spark.driver.maxResultSize=2G
. Then, I made a slight change to the join condition, and the error resurfaces.
Edit: In resizing the cluster, I also doubled the number of partitions the DataFrame assumes in a .coalesce(256)
to a .coalesce(512)
, so I can't be sure it's not because of that.
My question is, since I am not collecting anything to the driver, why should spark.driver.maxResultSize
matter at all here? Is the driver's memory being used for something in the join that I'm not aware of?
val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
line won't run any job. You must be doing some transformation which triggers the job. – Trysaildf.write.csv(...)
. – Terrariumjoined
? Something likejoined.queryExecution.toRdd.getNumPartitions
. I'm curious why you had78021 tasks
. Could be that a better solution is to lower the number of partitions for the datasets in join? – Champagnedf.explain
)? – ChampagneFileScanRDD
followed byMapPartitionsRDD
. Then a lot ofUnionRDD
. With finally a distinct operation on the result of all the unions. But no (broadcast) joins or collect... I can of course see why this execution plan is not ideal, but not wherespark.driver.maxResultSize
comes in. When--deploy-mode cluster
is set there is no crash. – Terrarium