Spark v3.0.0 - WARN DAGScheduler: broadcasting large task binary with size xx
Asked Answered
F

3

12

I'm new to spark. I'm coding a machine learning algorithm in Spark standalone (v3.0.0) with this configurations set:

SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.set("spark.driver.memory", "8g");
conf.set("spark.driver.maxResultSize", "8g");
conf.set("spark.memory.fraction", "0.6");
conf.set("spark.memory.storageFraction", "0.5");
conf.set("spark.sql.shuffle.partitions", "5");
conf.set("spark.memory.offHeap.enabled", "false");
conf.set("spark.reducer.maxSizeInFlight", "96m");
conf.set("spark.shuffle.file.buffer", "256k");
conf.set("spark.sql.debug.maxToStringFields", "100");

This is how I create the CrossValidator

ParamMap[] paramGrid = new ParamGridBuilder()
            .addGrid(gbt.maxBins(), new int[]{50})
            .addGrid(gbt.maxDepth(), new int[]{2, 5, 10})
            .addGrid(gbt.maxIter(), new int[]{5, 20, 40})
            .addGrid(gbt.minInfoGain(), new double[]{0.0d, .1d, .5d})
            .build();

    CrossValidator gbcv = new CrossValidator()
            .setEstimator(gbt)
            .setEstimatorParamMaps(paramGrid)
            .setEvaluator(gbevaluator)
            .setNumFolds(5)
            .setParallelism(8)
            .setSeed(session.getArguments().getTrainingRandom());

The problem is that when (in paramGrid) maxDepth is just {2, 5} and maxIter {5, 20} all works just fine, but when it is like in the code above it keeps logging: WARN DAGScheduler: broadcasting large task binary with size xx, with xx going from 1000 KiB to 2.9 MiB, often leading to a timeout exception Which spark parameters should i change to avoid this?

False answered 2/9, 2020 at 10:52 Comment(2)
consider increasing partitions .. so that your tasks are light weighted ..... each task will be not handling less amount of data... check this --> mail-archives.us.apache.org/mod_mbox/spark-user/201407.mbox/… ..Davy
Hi @vittoema96. How did you resolve it at last?Electoral
H
3

For timeout issue consider changing the following configuration:

spark.sql.autoBroadcastJoinThreshold to -1.

This will remove this limit of broadcast size which is 10MB.

Honniball answered 21/12, 2020 at 7:44 Comment(2)
By setting this value to -1, broadcasting can be disabled. From docsFullblooded
Is it advisable to disable broadcast?Alber
G
0

Solution that worked for me was:

reducing task size => reduce the data its handling

First, check number of partitions in dataframe via df.rdd.getNumPartitions() After, increase partitions: df.repartition(100)

Grape answered 14/5, 2021 at 16:11 Comment(0)
E
0

I got similar WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.2 MiB What worked for me, I increase the Machine Configuration from 2vCPU, 7.5GB RAM, to 4vCPU 15GBRAM (Some parquet file were created but job never complete, hence I increase to 8vCPU 32GB RAM (everything now work). This is on GCP Dataproc

Epizoic answered 6/3, 2022 at 6:4 Comment(3)
Hi @Bluelily. I am running into the same problem here as well WARN DAGScheduler: Broadcasting large task binary with size #### MiB. I read in the training set parquet file from Google Drive and perform the training on Vertex AI Workbench Jupyter Notebook. How do we increase Machine Configuration for Vertex AI Workbench Jupyter Notebook so it can complete training? Notebook snapshot Thanks in advance.Electoral
Can you check memory/CPU allocation to you? And if there's a way to increase to the next capacity. I use dataproc in the previous project, hence I was able to manually upgrade those resources. Hope this give you insight on what to do next.Epizoic
Thanks @Bluelily. The training finally finished running after I changed the "Machine type" to 8 vCPUs, 30 GB RAM on Vertex AI Workbench page though WARN DAGScheduler: Broadcasting large task binary with size #### MiB still showing.Electoral

© 2022 - 2024 — McMap. All rights reserved.