Coalesce reduces parallelism of entire stage (spark)
Asked Answered
D

1

22

Sometimes Spark "optimizes" a dataframe plan in an inefficient way. Consider the following example in Spark 2.1 (can also be reproduced in Spark 1.6):

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))

df_result
.coalesce(1)
.saveAsTable(tablename)

In this example I want to write 1 file after an expensive transformation of a dataframe (this is just an example to demonstrate the issue). Spark moves the coalesce(1) up such that the UDF is only applied to a dataframe containing 1 partition, thus destroying parallelism (interestingly repartition(1) does not behave this way).

To generalize, this behavior occurs when I want to increase parallelism in a certain part of my transformation, but decrease parallelism thereafter.

I've found one workaround which consists of caching the dataframe and then triggering the complete evaluation of the dataframe:

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache

df_result.rdd.count // trigger computation

df_result
.coalesce(1)
.saveAsTable(tablename)

My question is: is there another way to tell Spark not to decrease parallelism in such cases?

Derron answered 12/6, 2017 at 8:24 Comment(4)
In a nutshell, you want to instantiate a RDD with 500 partitions, then instantiate another one to merge the results into just 1 partition so that you can save it into a single file -- cf. #31384404 >> wild guess: maybe a simple call to getNumPartitions() would be sufficient to force instantiation, without having to actually scan the result with count()...Censor
@SamsonScharfrichter no calling getNumPartitions() is not sufficient and does not prevent the coalesce to be "pushed up"Derron
Coincidence: I just stumbled on that presentation, from the recent Spark Summit > slideshare.net/databricks/…Censor
This seems a very contrived example, is there a real case where you've experienced this? How about using something like reduceByKey after the UDF execution to break it up.Exaggerative
S
21

Actually it is not because of SparkSQL's optimization, SparkSQL doesn't change the position of Coalesce operator, as the executed plan shows:

Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
   +- *SerializeFromObject [input[0, double, false] AS value#2]
      +- Scan ExternalRDDScan[obj#1]

I quote a paragraph from coalesce API's description:

Note: This paragraph is added by the jira SPARK-19399. So it should not be found in 2.0's API.

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

The coalesce API doesn't perform a shuffle, but results in a narrow dependency between previous RDD and current RDD. As RDD is lazy evaluation, the computation is actually done with coalesced partitions.

To prevent it, you should use repartition API.

Sunrise answered 27/11, 2017 at 5:28 Comment(5)
please could you elaborate the cited paragraph @viirya?Scimitar
Is this paragraph suggesting that the "computation taking place on fewer nodes" is the UDF execution? It doesn't appear so from the execution plan, unless you are suggesting that the coalesce reduces the number of upstream nodes, which seems illogical.Exaggerative
Thank you. This is REALLY helpful. All I knew was "coalesce() is more efficient for reducing partitions and repartition() is faster for increasing them." Now I actually understand the consequences - pushing repartitions up in my script has given me headaches in the past!Godhood
Honest question: is there well and truly a valid reason to have both coalesce and repartition with modern spark (3.0+)? Given the end result is intended to be the same, it makes sense to me that spark should make the decision under the hood which operation to call rather than leaving largely misunderstood footguns laying around.Slavin
Do I understand it correctly: even when the execution plan told that coalesce(1) was performed last, it was actually performed BEFORE the transformation which added a column?Selsyn

© 2022 - 2024 — McMap. All rights reserved.