How to join on multiple columns in Pyspark?
Asked Answered
V

4

97

I am using Spark 1.3 and would like to join on multiple columns using python interface (SparkSQL)

The following works:

I first register them as temp tables.

numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")

test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')

I would now like to join them based on multiple columns.

I get SyntaxError: invalid syntax with this:

test  = numeric.join(Ref,
   numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
   numeric.STATUS == Ref.STATUS ,  joinType='inner')
Vibrato answered 16/11, 2015 at 22:37 Comment(0)
A
154

You should use & / | operators and be careful about operator precedence (== has lower precedence than bitwise AND and OR):

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, (df1.x1 == df2.x1) & (df1.x2 == df2.x2))
df.show()

## +---+---+---+---+---+---+
## | x1| x2| x3| x1| x2| x3|
## +---+---+---+---+---+---+
## |  2|  b|3.0|  2|  b|0.0|
## +---+---+---+---+---+---+
Autoplasty answered 16/11, 2015 at 22:50 Comment(3)
When you say 'be careful about operator precedence', what do you mean? Do you mean I should put parentheses in the right place to AND the correct tables together?Berkow
@Chogg, what he means is that if you're not careful with parentheses, the phrase df1.x1 == df2.x1 & df1.x2 == df2.x2, (parentheses removed) would be evaluated by the Python interpreter as df1.x1 == (df2.x1 & df1.x2) == df2.x2, which would potentially throw a confusing and non-descriptive error.Poore
Why does it produce the columns x1 and x2 twice?Liss
E
95

An alternative approach would be:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x4"))

df = df1.join(df2, ['x1','x2'])
df.show()

which outputs:

+---+---+---+---+
| x1| x2| x3| x4|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+

With the main advantage being that the columns on which the tables are joined are not duplicated in the output, reducing the risk of encountering errors such as org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.


Whenever the columns in the two tables have different names, (let's say in the example above, df2 has the columns y1, y2 and y4), you could use the following syntax:

df = df1.join(df2.withColumnRenamed('y1','x1').withColumnRenamed('y2','x2'), ['x1','x2'])
Epidemic answered 23/4, 2018 at 7:49 Comment(2)
what if I do an outer join and like to keep only a single occurrence of the keyAlbania
This is probably my least favorite pyspark error: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L. I don't understand why it lets you do something like df = df1.join(df2, df1.x1 == df2.x1) and then errors as soon as you try to do almost anything with the resulting df. That's just a minor rant, but is there any reason why you'd ever want the resulting df with duplicated names?Jimmy
S
13
test = numeric.join(Ref, 
   on=[
     numeric.ID == Ref.ID, 
     numeric.TYPE == Ref.TYPE,
     numeric.STATUS == Ref.STATUS 
   ], how='inner')
Scribner answered 11/5, 2021 at 18:22 Comment(2)
Welcome to StackOverflow. Can you maybe explain your code a bit more? Why is it structured like this? How does it work? etc.Arbitrage
Answer's are great. But for best practices, please provide an explanation. You only posting code makes the OP and future commers copy and paste your answer without understanding the logic behind the answer. Please provide an answer with some explanation. Thank You!Ganley
C
1

You can also provide a list of strings, if the column names are the same.

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, ["x1","x2"])

df.show()
+---+---+---+---+
| x1| x2| x3| x3|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+

Another way to go about this, if column names are different and if you want to rely on column name strings is the following:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("y1", "y2", "y3"))

df = df1.join(df2, (col("x1")==col("y1")) & (col("x2")==col("y2")))

df.show()
+---+---+---+---+---+---+
| x1| x2| x3| y1| y2| y3|
+---+---+---+---+---+---+
|  2|  b|3.0|  2|  b|0.0|
+---+---+---+---+---+---+

This is useful if you want to reference column names dynamically and also in instances where there is a space in the column name and you cannot use the df.col_name syntax. You should look at changing the column name in that case anyway though.

Czech answered 1/8, 2022 at 9:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.