Pyspark: Joining 2 dataframes by ID & Closest date backwards
Asked Answered
C

3

6

I'm having the world of issues performing a rolling join of two dataframes in pyspark (and python in general). I am looking to join two pyspark dataframes together by their ID & closest date backwards (meaning the date in the second dataframe cannot be greater than the one in the first)

Table_1:

+-----+------------+-------+
| ID  |    Date    | Value |
+-----+------------+-------+
| A1  | 01-15-2020 |     5 |
| A2  | 01-20-2020 |    10 |
| A3  | 02-21-2020 |    12 |
| A1  | 02-21-2020 |     6 |
+-----+------------+-------+

Table_2:

enter image description here

Desired Result:

ID   Date    Value   Value 2
A1  01-15-2020  5   5
A2  01-20-2020  10  12
A3  02-21-2020  12  14
A1  01-21-2020  6   3

In essence, I understand an SQL Query can do the trick where I can do spark.sql("query") So anything else. I've tried several things which aren't working in a spark context. Thanks!

Chandrachandragupta answered 8/8, 2020 at 2:31 Comment(0)
H
3
df1=spark.createDataFrame([('A1','1/15/2020',5),
                           ('A2','1/20/2020',10), 
                           ('A3','2/21/2020',12),
                           ('A1','1/21/2020',6)],
                           ['ID1','Date1','Value1'])

df2=spark.createDataFrame([('A1','1/10/2020',1),
                           ('A1','1/12/2020',5),
                           ('A1','1/16/2020',3),
                           ('A2','1/25/2020',20),
                           ('A2','1/1/2020',12),
                           ('A3','1/31/2020',14),
                           ('A3','1/30/2020',12)],['ID2','Date2','Value2'])

df2=df1.join(df2,df1.ID1==df2.ID2) \
    .withColumn("distance",datediff(to_date(df1.Date1,'MM/dd/yyyy'),\
     to_date(df2.Date2,'MM/dd/yyyy'))).filter("distance>0")

df2.groupBy(df2.ID1,df2.Date1,df2.Value1)\
   .agg(min(df2.distance).alias('distance')).join(df2, ['ID1','Date1','distance'])\
   .select(df2.ID1,df2.Date1,df2.Value1,df2.Value2).orderBy('ID1','Date1').show()
Holoblastic answered 9/8, 2020 at 1:28 Comment(2)
I like the idea, but could you please explain what you are doing in your code?Magda
Generally speaking : it's doing a N-N comparison of all the dates with the same ID, which is then having a N^2 complexity. It computes the distance for each pair, and keep the lower distance's ones. If both dataframes are big (many times the same ID) it could lead to some strong performance impact.Licit
P
2

Here is my trial.

First, I determine the Date_2 which met your condition. After that, join the second dataframe again and get the Value_2

from pyspark.sql.functions import monotonically_increasing_id, unix_timestamp, max

df3 = df1.withColumn('newId', monotonically_increasing_id()) \
  .join(df2, 'ID', 'left') \
  .where(unix_timestamp('Date', 'M/dd/yy') >= unix_timestamp('Date_2', 'M/dd/yy')) \
  .groupBy(*df1.columns, 'newId') \
  .agg(max('Date_2').alias('Date_2'))
df3.orderBy('newId').show(20, False)    

+---+-------+-----+-----+-------+
|ID |Date   |Value|newId|Date_2 |
+---+-------+-----+-----+-------+
|A1 |1/15/20|5    |0    |1/12/20|
|A2 |1/20/20|10   |1    |1/11/20|
|A3 |2/21/20|12   |2    |1/31/20|
|A1 |1/21/20|6    |3    |1/16/20|
+---+-------+-----+-----+-------+

df3.join(df2, ['ID', 'Date_2'], 'left') \
  .orderBy('newId') \
  .drop('Date_2', 'newId') \
  .show(20, False)

+---+-------+-----+-------+
|ID |Date   |Value|Value_2|
+---+-------+-----+-------+
|A1 |1/15/20|5    |5      |
|A2 |1/20/20|10   |12     |
|A3 |2/21/20|12   |14     |
|A1 |1/21/20|6    |3      |
+---+-------+-----+-------+
Prolix answered 8/8, 2020 at 11:4 Comment(0)
D
1

I would prefer to solve this problem using Window.
You need to join both datasets using id and date(>=), then you need to know how many days of difference you have in order to filter what you need using dense_rank to just get closest date.

from pyspark.sql.functions import col, datediff, dense_rank
from pyspark.sql.window import Window
from datetime import date

df1 = (
  spark
  .createDataFrame(
    [
      ("A1",date(2020, 1, 15), 5),
      ("A2",date(2020, 1, 20), 10),
      ("A3",date(2020, 2, 21), 12),
      ("A1",date(2020, 1, 21), 6),
    ],
    ["id_1","date_1","value_1"]
  )
)

df2 = (
  spark
  .createDataFrame(
    [
      ("A1",date(2020, 1, 10), 1),
      ("A1",date(2020, 1, 12), 5),
      ("A1",date(2020, 1, 16), 3),
      ("A2",date(2020, 1, 25), 20),
      ("A2",date(2020, 1, 1), 12),
      ("A3",date(2020, 1, 31), 14),
      ("A3",date(2020, 1, 30), 12)
    ],
    ["id_2","date_2","value_2"]
  )
)

winSpec = Window.partitionBy("value_1").orderBy("date_difference")

df3 = (
  df1
  .join(df2, [df1.id_1==df2.id_2,df1.date_1>=df2.date_2])
  .withColumn("date_difference", datediff("date_1","date_2"))
  .withColumn("dr", dense_rank().over(winSpec))
  .where("dr=1")
  .select(
    col("id_1").alias("id"),
    col("date_1").alias("date"),
    col("value_1"),
    col("value_2")
  )
)

+---+----------+-------+-------+
|id |date      |value_1|value_2|
+---+----------+-------+-------+
|A1 |2020-01-21|6      |3      |
|A1 |2020-01-15|5      |5      |
|A2 |2020-01-20|10     |12     |
|A3 |2020-02-21|12     |14     |
+---+----------+-------+-------+
Decare answered 10/3, 2022 at 20:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.