Including null values in an Apache Spark Join
Asked Answered
L

7

67

I would like to include null values in an Apache Spark join. Spark doesn't include rows with null by default.

Here is the default Spark behavior.

val numbersDf = Seq(
  ("123"),
  ("456"),
  (null),
  ("")
).toDF("numbers")

val lettersDf = Seq(
  ("123", "abc"),
  ("456", "def"),
  (null, "zzz"),
  ("", "hhh")
).toDF("numbers", "letters")

val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))

Here is the output of joinedDf.show():

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
+-------+-------+

This is the output I would like:

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
|   null|    zzz|
+-------+-------+
Locally answered 18/1, 2017 at 20:21 Comment(0)
R
118

Spark provides a special NULL safe equality operator:

numbersDf
  .join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers"))
  .drop(lettersDf("numbers"))
+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|   null|    zzz|
|       |    hhh|
+-------+-------+

Be careful not to use it with Spark 1.5 or earlier. Prior to Spark 1.6 it required a Cartesian product (SPARK-11111 - Fast null-safe join).

In Spark 2.3.0 or later you can use Column.eqNullSafe in PySpark:

numbers_df = sc.parallelize([
    ("123", ), ("456", ), (None, ), ("", )
]).toDF(["numbers"])

letters_df = sc.parallelize([
    ("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh")
]).toDF(["numbers", "letters"])

numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers))
+-------+-------+-------+
|numbers|numbers|letters|
+-------+-------+-------+
|    456|    456|    def|
|   null|   null|    zzz|
|       |       |    hhh|
|    123|    123|    abc|
+-------+-------+-------+

and %<=>% in SparkR:

numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, "")))
letters_df <- createDataFrame(data.frame(
  numbers = c("123", "456", NA, ""),
  letters = c("abc", "def", "zzz", "hhh")
))

head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers))
  numbers numbers letters
1     456     456     def
2    <NA>    <NA>     zzz
3                     hhh
4     123     123     abc

With SQL (Spark 2.2.0+) you can use IS NOT DISTINCT FROM:

SELECT * FROM numbers JOIN letters 
ON numbers.numbers IS NOT DISTINCT FROM letters.numbers

This is can be used with DataFrame API as well:

numbersDf.alias("numbers")
  .join(lettersDf.alias("letters"))
  .where("numbers.numbers IS NOT DISTINCT FROM letters.numbers")
Rochester answered 18/1, 2017 at 21:1 Comment(4)
Thanks. This is another good answer that uses the <=> operator. If you're doing a multiple column join, the conditions can be chained with the && operator.Locally
In my experience (Spark 2.2.1 on Amazon Glue), the SQL syntax is the same as the Scala: SELECT * FROM numbers JOIN letters ON numbers.numbers <=> letters.numbersPresa
is there a way to use eqNullSafe if I am passing to join's on parameter a list of columns?Bovid
@Rochester I have a similar question, but I want to do it with Seq. Can you help link is- #61129118Factitious
C
12
val numbers2 = numbersDf.withColumnRenamed("numbers","num1") //rename columns so that we can disambiguate them in the join
val letters2 = lettersDf.withColumnRenamed("numbers","num2")
val joinedDf = numbers2.join(letters2, $"num1" === $"num2" || ($"num1".isNull &&  $"num2".isNull) ,"outer")
joinedDf.select("num1","letters").withColumnRenamed("num1","numbers").show  //rename the columns back to the original names
Copepod answered 18/1, 2017 at 21:15 Comment(0)
F
7

Based on K L's idea, you could use foldLeft to generate join column expression:

def nullSafeJoin(rightDF: DataFrame, columns: Seq[String], joinType: String)(leftDF: DataFrame): DataFrame = 
{

  val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
  val fullExpr = columns.tail.foldLeft(colExpr) { 
    (colExpr, p) => colExpr && leftDF(p) <=> rightDF(p) 
  }

  leftDF.join(rightDF, fullExpr, joinType)
}

then, you could call this function just like:

aDF.transform(nullSafejoin(bDF, columns, joinType))
Fallonfallout answered 8/9, 2019 at 21:24 Comment(1)
this is cleaner: val fullExpr = columns.map(n => leftDF(n) <==> rightDF(n)).reduceLeft(_ && _)Husted
C
4

Complementing the other answers, for PYSPARK < 2.3.0 you would not have Column.eqNullSafe neither IS NOT DISTINCT FROM.

You still can build the <=> operator with an sql expression to include it in the join, as long as you define alias for the join queries:

from pyspark.sql.types import StringType
import pyspark.sql.functions as F

numbers_df = spark.createDataFrame (["123","456",None,""], StringType()).toDF("numbers")
letters_df = spark.createDataFrame ([("123", "abc"),("456", "def"),(None, "zzz"),("", "hhh") ]).\
    toDF("numbers", "letters")

joined_df = numbers_df.alias("numbers").join(letters_df.alias("letters"),
                                             F.expr('numbers.numbers <=> letters.numbers')).\
    select('letters.*')
joined_df.show()

+-------+-------+
|numbers|letters|
+-------+-------+
|    456|    def|
|   null|    zzz|
|       |    hhh|
|    123|    abc|
+-------+-------+
Cumshaw answered 18/5, 2020 at 15:26 Comment(0)
I
0

This is a late answer but there is an elegant way to create eqNullSafe joins in PySpark:

from pyspark.sql.dataframe import DataFrame

def null_safe_join(self, other:DataFrame, cols:list, mode:str):
    """
    Function for null safe joins. In normal joins, null values will be disregarded.
    In a null safe join, null values will be treated as equals.
    """
    join_cond = [self[col].eqNullSafe(other[col]) for col in cols]
    return (
        self.join(other, join_cond, mode)
        .drop(*[other[col] for col in cols])
    )

DataFrame.null_safe_join = null_safe_join

This will add the null_safe_join method to the DataFrame class, allowing it to be used as a method on any DataFrame object like so:

joinedDf = numbersDf.null_safe_join(lettersDf, ["numbers"], "inner")

This performs an inner eqNullSafe join between numbersDf and lettersDf on the column numbers.

Illstarred answered 27/6 at 9:15 Comment(0)
H
-1

Based on timothyzhang's idea one can further improve it by removing duplicate columns:

def dropDuplicateColumns(df: DataFrame, rightDf: DataFrame, cols: Seq[String]): DataFrame 
= cols.foldLeft(df)((df, c) => df.drop(rightDf(c)))
def joinTablesWithSafeNulls(rightDF: DataFrame, leftDF: DataFrame, columns: Seq[String], joinType: String): DataFrame = 
{

val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)

val fullExpr = columns.tail.foldLeft(colExpr) {
  (colExpr, p) => colExpr && leftDF(p) <=> rightDF(p)
}

val finalDF = leftDF.join(rightDF, fullExpr, joinType)

val filteredDF = dropDuplicateColumns(finalDF, rightDF, columns)

filteredDF

}
Hygrograph answered 26/1, 2021 at 22:27 Comment(0)
C
-2

Try the following method to include the null rows to the result of JOIN operator:

def nullSafeJoin(leftDF: DataFrame, rightDF: DataFrame, columns: Seq[String], joinType: String): DataFrame = {

    var columnsExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)

    columns.drop(1).foreach(column => {
        columnsExpr = columnsExpr && (leftDF(column) <=> rightDF(column))
    })

    var joinedDF: DataFrame = leftDF.join(rightDF, columnsExpr, joinType)

    columns.foreach(column => {
        joinedDF = joinedDF.drop(leftDF(column))
    })

    joinedDF
}
Chaldean answered 24/6, 2019 at 22:2 Comment(3)
This method has a problem, it will drop leftDF columns at the end, which is wrong for right joins. I proposed an edit with a TODO, I think it will work as it is (I'm using it now). But just in case someone else copies it, he should verify that too.Immixture
The edit was rejected... god knows why, the following "code" should the fix it on the last foreach: columns.foreach(column => { if (joinType.contains("right")) { joinedDF = joinedDF.drop(leftDF(column)) } else { joinedDF = joinedDF.drop(rightDF(column)) } })Immixture
Very true -- or you could call and reverse the order... so left and right are switched.Chaldean

© 2022 - 2024 — McMap. All rights reserved.