I'm running a Spark job to aggregate data. I have a custom data structure called a Profile, which basically contains a mutable.HashMap[Zone, Double]
. I want to merge all profiles that share a given key (a UUID), with the following code:
def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
.aggregateByKey(new Profile(), 3200)(merge, merge).cache()
Curiously, Spark fails with the following error:
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
The obvious solution is to increment "spark.driver.maxResultSize", but two things puzzle me.
- Too much of a coincidence that I get 1024.0 greater than 1024.0
- All the documentation and help I found googling this particular error and configuration parameter indicates that it affect functions that take a value back to the driver. (say
take()
orcollect()
), but I'm not taking ANYTHING to the driver, just reading from HDFS, aggregating, saving back to HDFS.
Does anyone know why I'm getting this error?