Not sure on which version of Spark you are working but you may try to set spark.sql.adaptive.coalescePartitions.minPartitionNum to some value, for the beggining you may try with same value as for sql.shuffle.partitions
My hope is that with this setting you will have both - automatic coalesce for small partitions + handling of skew by aqe but when there want be much to do it will try to keep minimum number of partitions from spark.sql.adaptive.coalescePartitions.minPartitionNum
For this moment i don't see any other way to force spark to calculate it dynamically to keep partitions for example not bigger than 100 mb
Why i think that it may change something:
Here is description of this parameter:
val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
.internal()
.doc("(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions " +
"after coalescing. If not set, the default value is the default parallelism of the " +
"Spark cluster. This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
.version("3.0.0")
.intConf
.checkValue(_ > 0, "The minimum number of partitions must be positive.")
.createOptional
So it is optional, now lets check where it is used Spark code:
// Ideally, this rule should simply coalesce partitions w.r.t. the target size specified by
// ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this
// rule by default tries to maximize the parallelism and set the target size to
// `total shuffle size / Spark default parallelism`. In case the `Spark default parallelism`
// is too big, this rule also respect the minimum partition size specified by
// COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
// For history reason, this rule also need to support the config
// COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config in the future.
val minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
// We fall back to Spark default parallelism if the minimum number of coalesced partitions
// is not set, so to avoid perf regressions compared to no coalescing.
session.sparkContext.defaultParallelism
} else {
// If we don't need to maximize the parallelism, we set `minPartitionNum` to 1, so that
// the specified advisory partition size will be respected.
1
}
}
It looks like when this parameter is not set and spark.sql.adaptive.coalescePartitions.parallelismFirst is set to true (which is true by default) Spark is choosing default parallelism as minPartitionNum. Probably thats the reason why you see number of partitions equals to number of cores as you mentioned
If i understand it correctly if you set spark.sql.adaptive.coalescePartitions.minPartitionNum it should do the trick and allow you to have more control over partitions.
If it is not going to help or you expect something else you can try to experiment with other sql.adaptive parameters and check how they are used in source code.
I think that this blog post may be a good starting point