How to use monotonically_increasing_id to join two pyspark dataframes having no common column?
Asked Answered
C

3

1

I have two pyspark dataframes with same number of rows but they don't have any common column. So I am adding new column to both of them using monotonically_increasing_id() as

from pyspark.sql.functions import monotonically_increasing_id as mi
id=mi()
df1 = df1.withColumn("match_id", id)
cont_data = cont_data.withColumn("match_id", id)
cont_data = cont_data.join(df1,df1.match_id==cont_data.match_id, 'inner').drop(df1.match_id)

But after join the resulting data frame has less number of rows. What am I missing here. Thanks

Constituent answered 2/6, 2017 at 20:1 Comment(2)
If the datasets have nothing in common, isn't sorting relevant to how the result is going to look like? I'd say how your datasets are sorted is the implied/implicit join condition.Cleary
Monotonically, will not generate the same values for df1 and cont_data. Is the expected behavior. It generates a unique number each time is called: spark.apache.org/docs/3.1.3/api/python/reference/api/… You have to consider this function as a random number generator.Donettedoney
C
5

You just don't. This not an applicable use case for monotonically_increasing_id, which is by definition non-deterministic. Instead:

  • convert to RDD
  • zipWithIndex
  • convert back to DataFrame.
  • join
Chiropractor answered 2/6, 2017 at 21:0 Comment(1)
monotonically_increasing_id is supposed to generate always different numbers: spark.apache.org/docs/3.1.3/api/python/reference/api/… The solution for having consecutive numbers is to generate monotonically_increasing_id, and then windowing: df = df.withColumn("monotonically_increasing_id", monotonically_increasing_id()) window = Window.orderBy(scol('monotonically_increasing_id')) df = df.withColumn(self.row_id, row_number().over(window)) df = df.drop("monotonically_increasing_id") Donettedoney
B
0

You can generate the id's with monotonically_increasing_id, save the file to disc, and then read it back in THEN do whatever joining process. Would only suggest this approach if you just need to generate the id's once. At that point they can be used for joining, but for the reasons mentioned above, this is hacky and not a good solution for anything that runs regularly.

Bregenz answered 7/1, 2022 at 21:12 Comment(0)
D
0

If you want to get an incremental number on both dataframes and then join, you can generate a consecutive number with monotonically and windowing with the following code:

df1 = df1.withColumn("monotonically_increasing_id",monotonically_increasing_id()) 
window = Window.orderBy(scol('monotonically_increasing_id'))
df1 = df1.withColumn("match_id", row_number().over(window))
df1 = df1.drop("monotonically_increasing_id")

cont_data = cont_data.withColumn("monotonically_increasing_id",monotonically_increasing_id()) 
window = Window.orderBy(scol('monotonically_increasing_id'))
cont_data = cont_data.withColumn("match_id", row_number().over(window))
cont_data = cont_data.drop("monotonically_increasing_id")

cont_data = cont_data.join(df1,df1.match_id==cont_data.match_id, 'inner').drop(df1.match_id)

Warning It may move the data to a single partition! So maybe is better to separate the match_id to a different dataframe with the monotonically_increasing_id, generate the consecutive incremental number and then join with the data.

Donettedoney answered 27/7, 2022 at 19:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.