Here's solution I came to:
/**
* Expression that produce negative random between -1 and -`lowestValue`(inclusively).
*
* @example
* {{{
* spark
* .range(1, 100)
* .withColumn("negative", negativeRandomWithin(3))
* .select("negative")
* .distinct()
* .show(false)
* }}}
* +--------+
* |negative|
* +--------+
* |-2 |
* |-3 |
* |-1 |
* +--------+
*/
private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
negate(positiveRandomWithin(lowestValue)) - 1
}
/**
* Expression that produce positive random between 0 and `highestValue`(exclusively).
*
* @example
* {{{
* spark
* .range(1, 100)
* .withColumn("positive", positiveRandomWithin(3))
* .select("positive")
* .distinct()
* .show(false)
* }}}
* +--------+
* |positive|
* +--------+
* |0 |
* |1 |
* |2 |
* +--------+
*/
private[transformation] def positiveRandomWithin(highestValue: Long) = {
pmod((rand * highestValue).cast(LongType), lit(highestValue))
}
implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {
/**
* Particular optimized version of left outer join where left side of join has skewed `null` field.
*
* @note
* It works only for single column join which is applicable for `isNotNull`.
*
* Optimization algorithm:
* 1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
* 2. use appended column, with original join column value and `null` replacements, as join column from left dataset
* appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
*
* @note there is no checks how many `null` values on left dataset before applying above steps,
* as well as there is no checks does it sort merge join or broadcast.
*
* IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
*
* HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
*/
private[transformation] def nullSkewLeftJoin(right: DataFrame,
joinLeftCol: Column,
joinRightCol: Column,
skewedColumnPostFix: String = "skewed_column",
nullNumBuckets: Int = 10000): DataFrame = {
val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"
if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
} else {
underlying
.withColumn(skewedTempColumn,
when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
}
}
}
In short: I replace left dataset join key null
values by negative range, to make it evenly repartitioned.
NOTE: this solution only for left join and null
join key skew. I didn't want explode right dataset and do skew solution for any key. Also, after that step, null
join key values will be distributed to different partitions, hence, mapPartitions
etc. won't work.
As summary, above solution helped me, but I want to see more solution for this type of dataset join issues.