Filter Spark DataFrame based on another DataFrame that specifies denylist criteria
Asked Answered
C

2

48

I have a largeDataFrame (multiple columns and billions of rows) and a smallDataFrame (single column and 10,000 rows).

I'd like to filter all the rows from the largeDataFrame whenever the some_identifier column in the largeDataFrame matches one of the rows in the smallDataFrame.

Here's an example:

largeDataFrame

some_idenfitier,first_name
111,bob
123,phil
222,mary
456,sue

smallDataFrame

some_identifier
123
456

desiredOutput

111,bob
222,mary

Here is my ugly solution.

val smallDataFrame2 = smallDataFrame.withColumn("is_bad", lit("bad_row"))
val desiredOutput = largeDataFrame.join(broadcast(smallDataFrame2), Seq("some_identifier"), "left").filter($"is_bad".isNull).drop("is_bad")

Is there a cleaner solution?

Coccus answered 6/10, 2016 at 4:27 Comment(0)
D
94

You'll need to use a left_anti join in this case.

The left anti join is the opposite of a left semi join.

It filters out data from the right table in the left table according to a given key :

largeDataFrame
   .join(smallDataFrame, Seq("some_identifier"),"left_anti")
   .show
// +---------------+----------+
// |some_identifier|first_name|
// +---------------+----------+
// |            222|      mary|
// |            111|       bob|
// +---------------+----------+
Distracted answered 6/10, 2016 at 6:45 Comment(8)
And this is why you don't use Strings, when you really want enums - the scaladoc of DataSet.join doesn't mention leftanti as an option, so it's impossible to figure out, what options there are -- without going on a deep dive. I'm starting to feel like this warrants a Jira -- and was already peeved by that choice of API before.Selfconsequence
To be honest, when i wrote this answer, Dataset were experimental and I'm still not a big fanDistracted
Thankfully Jacek has the full (or so I would like to hope) documentation for joins: jaceklaskowski.gitbooks.io/mastering-apache-spark/content/… -- hopefully leaving this here makes someone else's life easier.Selfconsequence
@RickMoritz The link is unavailable by now.Porch
It should be "left_anti" rather than "leftanti".Kwangchowan
Thanks @AndersEriksson ! Honest typoDistracted
Link for reference: jaceklaskowski.gitbooks.io/mastering-spark-sql/…Angle
Thanks @Angle ! I know that reference and I know Jacek well.Distracted
F
10

A version in pure Spark SQL (and using PySpark as an example, but with small changes same is applicable for Scala API):

def string_to_dataframe (df_name, csv_string):
    rdd = spark.sparkContext.parallelize(csv_string.split("\n"))
    df = spark.read.option('header', 'true').option('inferSchema','true').csv(rdd)
    df.registerTempTable(df_name)

string_to_dataframe("largeDataFrame", '''some_identifier,first_name
111,bob
123,phil
222,mary
456,sue''')

string_to_dataframe("smallDataFrame", '''some_identifier
123
456
''')

anti_join_df = spark.sql("""
    select * 
    from largeDataFrame L
    where NOT EXISTS (
            select 1 from smallDataFrame S
            WHERE L.some_identifier = S.some_identifier
        )
""")

print(anti_join_df.take(10))

anti_join_df.explain()

will output expectedly mary and bob:

[Row(some_identifier=222, first_name='mary'),
Row(some_identifier=111, first_name='bob')]

and also Physical Execution Plan will show it is using

== Physical Plan ==
SortMergeJoin [some_identifier#252], [some_identifier#264], LeftAnti
:- *(1) Sort [some_identifier#252 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(some_identifier#252, 200)
:     +- Scan ExistingRDD[some_identifier#252,first_name#253]
+- *(3) Sort [some_identifier#264 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(some_identifier#264, 200)
      +- *(2) Project [some_identifier#264]
         +- Scan ExistingRDD[some_identifier#264]

Notice Sort Merge Join is more efficient for joining / anti-joining data sets that are approximately of the same size. Since you have mentioned that that the small dataframe is smaller, we should make sure that Spark optimizer chooses Broadcast Hash Join which will be much more efficient in this scenario :

I will change NOT EXISTS to NOT IN clause for this :

anti_join_df = spark.sql("""
    select * 
    from largeDataFrame L
    where L.some_identifier NOT IN (
            select S.some_identifier
            from smallDataFrame S
        )
""")

anti_join_df.explain()

Let's see what it gave us :

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftAnti, ((some_identifier#302 = some_identifier#314) || isnull((some_identifier#302 = some_identifier#314)))
:- Scan ExistingRDD[some_identifier#302,first_name#303]
+- BroadcastExchange IdentityBroadcastMode
   +- Scan ExistingRDD[some_identifier#314]

Notice that Spark Optimizer actually chose Broadcast Nested Loop Join and not Broadcast Hash Join. The former is okay since we have just two records to exclude from the left side.

Also notice that both execution plans do have LeftAnti so it is similar to @eliasah answer, but is implemented using pure SQL. Plus it shows that you can have more control over physical execution plan.

PS. Also keep in mind that if the right dataframe is much smaller than the left-side dataframe but is bigger than just a few records, you do want to have Broadcast Hash Join and not Broadcast Nested Loop Join nor Sort Merge Join. If this doesn't happen, you may need to tune up spark.sql.autoBroadcastJoinThreshold as it defaults to 10Mb, but it has to be bigger than the size of the "smallDataFrame".

Fanion answered 6/3, 2019 at 22:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.