Spark Dataframe distinguish columns with duplicated name
Asked Answered
R

13

151

So as I know in Spark Dataframe, that for multiple columns can have the same name as shown in below dataframe snapshot:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

Above result is created by join with a dataframe to itself, you can see there are 4 columns with both two a and f.

The problem is is there when I try to do more calculation with the a column, I cant find a way to select the a, I have try df[0] and df.select('a'), both returned me below error mesaage:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

Is there anyway in Spark API that I can distinguish the columns from the duplicated names again? or maybe some way to let me change the column names?

Ramtil answered 18/11, 2015 at 11:16 Comment(0)
A
79

I would recommend that you change the column names for your join.

df1.select(col("a") as "df1_a", col("f") as "df1_f")
   .join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))

The resulting DataFrame will have schema

(df1_a, df1_f, df2_a, df2_f)
Alvardo answered 18/11, 2015 at 11:33 Comment(8)
You may need to fix your answer since the quotes aren't adjusted properly between column names.Meiosis
@SamehSharaf I assume that you are the one down voting my answer? But the answer is in fact 100% correct - I'm simply using the scala '-shorthand for column selection, so there is in fact no problem with quotes.Alvardo
@GlennieHellesSindholt, fair point. It is confusing because the answer is tagged as python and pyspark.Gingersnap
What if each dataframe contains 100+ columns and we just need to rename one column name that is the same? Surely, can't manually type in all those column names in the select clauseSoftshoe
In that case you could go with df1.withColumnRenamed("a", "df1_a")Alvardo
@GlennieHellesSindholt would you be able to write an pyspark equivalent of this answer? pleaseAnemic
@Dee Just have a look at the answer below from zero323.Alvardo
@GlennieHellesSindholt Wondering if schema change approach could solve my issue: #63966539Fernandina
S
161

Lets start with some data:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=125231, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])

df2 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

There are a few ways you can approach this problem. First of all you can unambiguously reference child table columns using parent columns:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

You can also use table aliases:

from pyspark.sql.functions import col

df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")

df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Finally you can programmatically rename columns:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))

df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)

## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
Santiagosantillan answered 18/11, 2015 at 11:44 Comment(5)
Thanks for your editing for showing so many ways of getting the correct column in those ambiguously cases, I do think your examples should go into the Spark programming guide. I've learned a lot!Ramtil
small correction: df2_r = **df2** .select(*(col(x).alias(x + '_df2') for x in df2.columns)) instead of df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns)). For the rest, good stuffRose
I agree with this should be part of the Spark programming guide. Pure gold. I was able to finally untangle the source of ambiguity selecting columns by the old names before doing the join. The solution of programmatically appending suffixes to the names of the columns before doing the join all the ambiguity wnet away.Speechmaker
@Ramtil : Did you understand why the renaming was needed df1_a = df1.alias("df1_a") and why we can't use df1 and df2 directly? This answer did not explain why the renaming was needed to make select('df1_a.f') workZolazoldi
@Zolazoldi It's in application to the original problem where there is one table df being joined with itself. Perhaps the solution would make more sense if it had written df.alias("df1_a") and df.alias("df2_a").Ultrasonics
G
82

There is a simpler way than writing aliases for all of the columns you are joining on by doing:

df1.join(df2,['a'])

This works if the key that you are joining on is the same in both tables.

See https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

Grammer answered 19/6, 2018 at 16:55 Comment(5)
this is the actual answer as of Spark 2+Cricket
And for Scala: df1.join(df2, Seq("a"))Lipoid
page was moved to: kb.databricks.com/data/…Copenhagen
Glad I kept scrolling, THIS is the much better answer. If columns have different names, then no ambiguity issue. If columns have the same name, do this. There is little reason to every need to deal with ambiguous col names with this method.Millur
I am doing the same but I am joining based on the two columns, this will work with more than one column? if yes, then I don't know why it is not working for me. df1.join(df2,['a','b'])Lebna
A
79

I would recommend that you change the column names for your join.

df1.select(col("a") as "df1_a", col("f") as "df1_f")
   .join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))

The resulting DataFrame will have schema

(df1_a, df1_f, df2_a, df2_f)
Alvardo answered 18/11, 2015 at 11:33 Comment(8)
You may need to fix your answer since the quotes aren't adjusted properly between column names.Meiosis
@SamehSharaf I assume that you are the one down voting my answer? But the answer is in fact 100% correct - I'm simply using the scala '-shorthand for column selection, so there is in fact no problem with quotes.Alvardo
@GlennieHellesSindholt, fair point. It is confusing because the answer is tagged as python and pyspark.Gingersnap
What if each dataframe contains 100+ columns and we just need to rename one column name that is the same? Surely, can't manually type in all those column names in the select clauseSoftshoe
In that case you could go with df1.withColumnRenamed("a", "df1_a")Alvardo
@GlennieHellesSindholt would you be able to write an pyspark equivalent of this answer? pleaseAnemic
@Dee Just have a look at the answer below from zero323.Alvardo
@GlennieHellesSindholt Wondering if schema change approach could solve my issue: #63966539Fernandina
S
16

This is how we can join two Dataframes on same column names in PySpark.

df = df1.join(df2, ['col1','col2','col3'])

If you do printSchema() after this then you can see that duplicate columns have been removed.

Shoifet answered 26/7, 2018 at 12:26 Comment(0)
S
12

You can use def drop(col: Column) method to drop the duplicated column,for example:

DataFrame:df1

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

DataFrame:df2

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

when I join df1 with df2, the DataFrame will be like below:

val newDf = df1.join(df2,df1("a")===df2("a"))

DataFrame:newDf

+-------+-----+-------+-----+
| a     | f   | a     | f   |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+

Now, we can use def drop(col: Column) method to drop the duplicated column 'a' or 'f', just like as follows:

val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
Seedman answered 22/8, 2016 at 9:14 Comment(2)
Would this approach work if you are doing an outer join and the two columns have some dissimilar values?Elissa
You may not want to drop if different relations with same schema.Neidaneidhardt
P
9

Suppose the DataFrames you want to join are df1 and df2, and you are joining them on column 'a', then you have 2 methods

Method 1

df1.join(df2,'a','left_outer')

This is an awsome method and it is highly recommended.

Method 2

df1.join(df2,df1.a == df2.a,'left_outer').drop(df2.a)

Purchasable answered 27/4, 2018 at 2:26 Comment(0)
R
5

After digging into the Spark API, I found I can first use alias to create an alias for the original dataframe, then I use withColumnRenamed to manually rename every column on the alias, this will do the join without causing the column name duplication.

More detail can be refer to below Spark Dataframe API:

pyspark.sql.DataFrame.alias

pyspark.sql.DataFrame.withColumnRenamed

However, I think this is only a troublesome workaround, and wondering if there is any better way for my question.

Ramtil answered 18/11, 2015 at 11:24 Comment(0)
R
5

if only the key column is the same in both tables then try using the following way (Approach 1):

left. join(right , 'key', 'inner')

rather than below(approach 2):

left. join(right , left.key == right.key, 'inner')

Pros of using approach 1:

  • the 'key' will show only once in the final dataframe
  • easy to use the syntax

Cons of using approach 1:

  • only help with the key column
  • Scenarios, wherein case of left join, if planning to use the right key null count, this will not work. In that case, one has to rename one of the key as mentioned above.
Rosemarierosemary answered 10/12, 2019 at 13:36 Comment(0)
T
3

This might not be the best approach, but if you want to rename the duplicate columns(after join), you can do so using this tiny function.

def rename_duplicate_columns(dataframe):
    columns = dataframe.columns
    duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
    for index in duplicate_column_indices:
        columns[index] = columns[index]+'2'
    dataframe = dataframe.toDF(*columns)
    return dataframe
Trost answered 28/8, 2019 at 11:45 Comment(0)
F
3

If you have a more complicated use case than described in the answer of Glennie Helles Sindholt e.g. you have other/few non-join column names that are also same and want to distinguish them while selecting it's best to use aliasses, e.g:

df3 = df1.select("a", "b").alias("left")\
   .join(df2.select("a", "b").alias("right"), ["a"])\
   .select("left.a", "left.b", "right.b")

df3.columns
['a', 'b', 'b']
Fissile answered 5/9, 2019 at 17:3 Comment(0)
V
0

What worked for me

import databricks.koalas as ks

df1k = df1.to_koalas()
df2k = df2.to_koalas()
df3k = df1k.merge(df2k, on=['col1', 'col2'])
df3 = df3k.to_spark()

All of the columns except for col1 and col2 had "_x" appended to their names if they had come from df1 and "_y" appended if they had come from df2, which is exactly what I needed.

Versicle answered 3/2, 2021 at 14:11 Comment(0)
S
0

Pyspark 3.2.1 +

I found simple way of doing that in Spark 3.2.1 using toDF

df.show()
+------+------+---------+
|number|  word|     word|
+------+------+---------+
|     1| apple|   banana|
|     2|cherry|     pear|
|     3| grape|pineapple|
+------+------+---------+

df = df.toDF(*[val + str(i) for i, val in enumerate(df.columns)])

df.show()
+-------+------+---------+
|number0| word1|    word2|
+-------+------+---------+
|      1| apple|   banana|
|      2|cherry|     pear|
|      3| grape|pineapple|
+-------+------+---------+
Splendor answered 30/1, 2022 at 17:12 Comment(0)
L
0

Definitely late to the party but the below approach works for me with any number of same column names during dataframe join. The only way to avoid your issue is renaming the columns in any one of the df.

I am using lambda to rename the columns.

dataframe:df1
+-------+-----+
| a     | f   |
+-------+-----+
|107831 | xyz |
|107831 | abc |
+-------+-----+

dataframe:df2
+-------+-----+
| a     | f   |
+-------+-----+
|107831 | efg |
|107831 | jkl |
+-------+-----+

df_new = reduce(lambda df, col: df.withColumnRenamed(col, col+'_new'), df1.columns, df1)  
df_new.printSchema()

root
 |-- a_new: string (nullable = true)
 |-- f_new: string (nullable = true)

Now, both dfs can be joined that will have different column names

df_new.join(df2, df_new['a_new'] == df2['a'])

To select only the columns from df_new:

df_new.join(df2, df_new['a_new'] == df2['a']).select(*df_new.columns)
Ledford answered 28/2 at 21:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.