How to configure Spark to adjust the number of output partitions after a join or groupby?
Asked Answered
U

2

9

I know you can set spark.sql.shuffle.partitions and spark.sql.adaptive.advisoryPartitionSizeInBytes. The former will not work with adaptive query execution, and the latter only works for the first shuffle for some reason, after which it just uses the default number of partitions, i.e. #cores.

Is there a way to configure AQE to adjust the number of partitions such that each partition is no more than 100MB?

Underplot answered 19/5, 2023 at 0:2 Comment(7)
After this join/group by your partitions are skewed or are just really big? I think that AQE is not going to automatically split large partition if it is not identified as skewed (so larger than spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes(256 by default) and 5 x (spark.sql.adaptive.skewJoin.skewedPartitionFactor) larger than median partition sizePigpen
@Pigpen It's both, but mostly just really big, 500 to 800MBUnderplot
So probably it is as i described, AQE is not dealing with big partitions but only with skewed partitions or those which are to small (coalesce after shuffle)Pigpen
You are right, I understand why this is happening. I'm just wondering if there is a way to solve this automatically.Underplot
as per my understanding there isn't a direct configuration in Spark to dynamically adjust the number of output partitions based on a target partition size. if you want you can try repartition df using custom number of partitions calculated based on the size required for each partitionFragonard
Yes, that's what I'm doing already. That's why I'm trying to find how to automate this.Underplot
there was no easy way of doing this so we were passing the total_size as parameter in the workflow and run this experiment multiple times to actually get the desired level of partitionsFragonard
P
2

Not sure on which version of Spark you are working but you may try to set spark.sql.adaptive.coalescePartitions.minPartitionNum to some value, for the beggining you may try with same value as for sql.shuffle.partitions

My hope is that with this setting you will have both - automatic coalesce for small partitions + handling of skew by aqe but when there want be much to do it will try to keep minimum number of partitions from spark.sql.adaptive.coalescePartitions.minPartitionNum

For this moment i don't see any other way to force spark to calculate it dynamically to keep partitions for example not bigger than 100 mb

Why i think that it may change something:

Here is description of this parameter:

  val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
    buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
      .internal()
      .doc("(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions " +
        "after coalescing. If not set, the default value is the default parallelism of the " +
        "Spark cluster. This configuration only has an effect when " +
        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
        s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
      .version("3.0.0")
      .intConf
      .checkValue(_ > 0, "The minimum number of partitions must be positive.")
      .createOptional

So it is optional, now lets check where it is used Spark code:

// Ideally, this rule should simply coalesce partitions w.r.t. the target size specified by
// ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this
// rule by default tries to maximize the parallelism and set the target size to
// `total shuffle size / Spark default parallelism`. In case the `Spark default parallelism`
// is too big, this rule also respect the minimum partition size specified by
// COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
// For history reason, this rule also need to support the config
// COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config in the future.
val minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
  if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
    // We fall back to Spark default parallelism if the minimum number of coalesced partitions
    // is not set, so to avoid perf regressions compared to no coalescing.
    session.sparkContext.defaultParallelism
  } else {
    // If we don't need to maximize the parallelism, we set `minPartitionNum` to 1, so that
    // the specified advisory partition size will be respected.
    1
  }
}

It looks like when this parameter is not set and spark.sql.adaptive.coalescePartitions.parallelismFirst is set to true (which is true by default) Spark is choosing default parallelism as minPartitionNum. Probably thats the reason why you see number of partitions equals to number of cores as you mentioned

If i understand it correctly if you set spark.sql.adaptive.coalescePartitions.minPartitionNum it should do the trick and allow you to have more control over partitions.

If it is not going to help or you expect something else you can try to experiment with other sql.adaptive parameters and check how they are used in source code.

I think that this blog post may be a good starting point

Pigpen answered 22/5, 2023 at 19:5 Comment(2)
Thank you for the answer. Unfortunately, from looking at the code, I believe it will not help if the number of partitions is already small, because it will only coalesce (reduce) partitions: github.com/apache/spark/blob/master/sql/core/src/main/scala/org/…Underplot
Yes, after much research on my side as well, it seems like there is no good way of handling it automatically. Very unfortunate, because it makes it harder to scale the amount of data or the size of a cluster independently.Underplot
S
0

What version of spark are you using You might be encountering this bug.

Which I believe is better explained via this bug.(Sounds closer to your issue.)

But this all only holds true if you are on spark < 3.3

Seascape answered 25/5, 2023 at 13:54 Comment(1)
I am using 3.3.0. The issue in not with skew partitions, I'm asking how to set the number of post-shuffle partitions so that each partition is roughly of the desired size.Underplot

© 2022 - 2024 — McMap. All rights reserved.