A version in pure Spark SQL (and using PySpark as an example, but with small changes
same is applicable for Scala API):
def string_to_dataframe (df_name, csv_string):
rdd = spark.sparkContext.parallelize(csv_string.split("\n"))
df = spark.read.option('header', 'true').option('inferSchema','true').csv(rdd)
df.registerTempTable(df_name)
string_to_dataframe("largeDataFrame", '''some_identifier,first_name
111,bob
123,phil
222,mary
456,sue''')
string_to_dataframe("smallDataFrame", '''some_identifier
123
456
''')
anti_join_df = spark.sql("""
select *
from largeDataFrame L
where NOT EXISTS (
select 1 from smallDataFrame S
WHERE L.some_identifier = S.some_identifier
)
""")
print(anti_join_df.take(10))
anti_join_df.explain()
will output expectedly mary and bob:
[Row(some_identifier=222, first_name='mary'),
Row(some_identifier=111, first_name='bob')]
and also Physical Execution Plan will show it is using
== Physical Plan ==
SortMergeJoin [some_identifier#252], [some_identifier#264], LeftAnti
:- *(1) Sort [some_identifier#252 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(some_identifier#252, 200)
: +- Scan ExistingRDD[some_identifier#252,first_name#253]
+- *(3) Sort [some_identifier#264 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(some_identifier#264, 200)
+- *(2) Project [some_identifier#264]
+- Scan ExistingRDD[some_identifier#264]
Notice Sort Merge Join
is more efficient for joining / anti-joining data sets that are approximately of the same size.
Since you have mentioned that that the small dataframe is smaller, we should make sure that Spark optimizer chooses Broadcast Hash Join
which will be much more efficient in this scenario :
I will change NOT EXISTS
to NOT IN
clause for this :
anti_join_df = spark.sql("""
select *
from largeDataFrame L
where L.some_identifier NOT IN (
select S.some_identifier
from smallDataFrame S
)
""")
anti_join_df.explain()
Let's see what it gave us :
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftAnti, ((some_identifier#302 = some_identifier#314) || isnull((some_identifier#302 = some_identifier#314)))
:- Scan ExistingRDD[some_identifier#302,first_name#303]
+- BroadcastExchange IdentityBroadcastMode
+- Scan ExistingRDD[some_identifier#314]
Notice that Spark Optimizer actually chose Broadcast Nested Loop Join
and not Broadcast Hash Join
. The former is okay since we have just two records to exclude from the left side.
Also notice that both execution plans do have LeftAnti
so it is similar to @eliasah answer, but is implemented using pure SQL. Plus it shows that you can have more control over physical execution plan.
PS. Also keep in mind that if the right dataframe is much smaller than the left-side dataframe but is bigger than just a few records, you do want to have Broadcast Hash Join
and not Broadcast Nested Loop Join
nor Sort Merge Join
. If this doesn't happen, you may need to tune up spark.sql.autoBroadcastJoinThreshold as it defaults to 10Mb, but it has to be bigger than the size of the "smallDataFrame".