Spark Dataset/Dataframe join NULL skew key
Asked Answered
S

2

5

Working with Spark Dataset/DataFrame joins, I faced long running and failed with OOM jobs.

Here's input:

  • ~10 datasets with different size, mostly huge(>1 TB)
  • all left-joined to one base dataset
  • some of join keys are null

After some analysis, I found that failed and slow jobs reason is null skew key: when left side has millions of records with join key null.

I made some brute force approach to solve this issue, and here's I want to share it.

If you have better or any built-in solutions(for regular Apache Spark), please share it.

Stickpin answered 14/9, 2018 at 22:20 Comment(0)
S
3

Here's solution I came to:

  /**
    * Expression that produce negative random between -1 and -`lowestValue`(inclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("negative", negativeRandomWithin(3))
    *                  .select("negative")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |negative|
    *          +--------+
    *          |-2      |
    *          |-3      |
    *          |-1      |
    *          +--------+
    */
  private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
    negate(positiveRandomWithin(lowestValue)) - 1
  }

  /**
    * Expression that produce positive random between 0 and `highestValue`(exclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("positive", positiveRandomWithin(3))
    *                  .select("positive")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |positive|
    *          +--------+
    *          |0       |
    *          |1       |
    *          |2       |
    *          +--------+
    */
  private[transformation] def positiveRandomWithin(highestValue: Long) = {
    pmod((rand * highestValue).cast(LongType), lit(highestValue))
  }

  implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {

    /**
      * Particular optimized version of left outer join where left side of join has skewed `null` field.
      *
      * @note
      *       It works only for single column join which is applicable for `isNotNull`.
      *
      * Optimization algorithm:
      *   1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
      *   2. use appended column, with original join column value and `null` replacements, as join column from left dataset
      *      appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
      *
      * @note there is no checks how many `null` values on left dataset before applying above steps,
      *       as well as there is no checks does it sort merge join or broadcast.
      *
      * IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
      *
      * HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
      */
    private[transformation] def nullSkewLeftJoin(right: DataFrame,
                                                 joinLeftCol: Column,
                                                 joinRightCol: Column,
                                                 skewedColumnPostFix: String = "skewed_column",
                                                 nullNumBuckets: Int = 10000): DataFrame = {

      val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"

      if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
        underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      } else {
        underlying
          .withColumn(skewedTempColumn,
                      when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
          .join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      }
    }
  }

In short: I replace left dataset join key null values by negative range, to make it evenly repartitioned.

NOTE: this solution only for left join and null join key skew. I didn't want explode right dataset and do skew solution for any key. Also, after that step, null join key values will be distributed to different partitions, hence, mapPartitions etc. won't work.

As summary, above solution helped me, but I want to see more solution for this type of dataset join issues.

Stickpin answered 14/9, 2018 at 22:20 Comment(1)
Hi, I used your answer here (#57798059) and modified it a little bit. Hope you didn't mindColloquy
I
5

I had the same problem a time ago but I choose another approach after making some performance tests. It depends of your data, the data will tell you what is the better algorithm to solve this join problem.

In my case, I have more than 30% of data with null in the left side of join and the data is in parquet format. Given that, it's better for me to perform a filter where this key is null and where this key is not null, join only when not null, and later union both data.

val data = ...
val notJoinable = data.filter('keyToJoin.isNull)
val joinable = data.filter('keyToJoin.isNotNull)

joinable.join(...) union notJoinable

It avoids hotspot too. If I use your approach (negative numbers/whatever not-"joinable" value), spark will shuffle all this data which is a lot of data (more than 30%).

Just trying to show you another approach for your problem,

Inhesion answered 25/11, 2018 at 15:38 Comment(0)
S
3

Here's solution I came to:

  /**
    * Expression that produce negative random between -1 and -`lowestValue`(inclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("negative", negativeRandomWithin(3))
    *                  .select("negative")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |negative|
    *          +--------+
    *          |-2      |
    *          |-3      |
    *          |-1      |
    *          +--------+
    */
  private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
    negate(positiveRandomWithin(lowestValue)) - 1
  }

  /**
    * Expression that produce positive random between 0 and `highestValue`(exclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("positive", positiveRandomWithin(3))
    *                  .select("positive")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |positive|
    *          +--------+
    *          |0       |
    *          |1       |
    *          |2       |
    *          +--------+
    */
  private[transformation] def positiveRandomWithin(highestValue: Long) = {
    pmod((rand * highestValue).cast(LongType), lit(highestValue))
  }

  implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {

    /**
      * Particular optimized version of left outer join where left side of join has skewed `null` field.
      *
      * @note
      *       It works only for single column join which is applicable for `isNotNull`.
      *
      * Optimization algorithm:
      *   1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
      *   2. use appended column, with original join column value and `null` replacements, as join column from left dataset
      *      appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
      *
      * @note there is no checks how many `null` values on left dataset before applying above steps,
      *       as well as there is no checks does it sort merge join or broadcast.
      *
      * IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
      *
      * HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
      */
    private[transformation] def nullSkewLeftJoin(right: DataFrame,
                                                 joinLeftCol: Column,
                                                 joinRightCol: Column,
                                                 skewedColumnPostFix: String = "skewed_column",
                                                 nullNumBuckets: Int = 10000): DataFrame = {

      val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"

      if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
        underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      } else {
        underlying
          .withColumn(skewedTempColumn,
                      when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
          .join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      }
    }
  }

In short: I replace left dataset join key null values by negative range, to make it evenly repartitioned.

NOTE: this solution only for left join and null join key skew. I didn't want explode right dataset and do skew solution for any key. Also, after that step, null join key values will be distributed to different partitions, hence, mapPartitions etc. won't work.

As summary, above solution helped me, but I want to see more solution for this type of dataset join issues.

Stickpin answered 14/9, 2018 at 22:20 Comment(1)
Hi, I used your answer here (#57798059) and modified it a little bit. Hope you didn't mindColloquy

© 2022 - 2025 — McMap. All rights reserved.