I would prefer to solve this problem using Window.
You need to join both datasets using id and date(>=), then you need to know how many days of difference you have in order to filter what you need using dense_rank to just get closest date.
from pyspark.sql.functions import col, datediff, dense_rank
from pyspark.sql.window import Window
from datetime import date
df1 = (
spark
.createDataFrame(
[
("A1",date(2020, 1, 15), 5),
("A2",date(2020, 1, 20), 10),
("A3",date(2020, 2, 21), 12),
("A1",date(2020, 1, 21), 6),
],
["id_1","date_1","value_1"]
)
)
df2 = (
spark
.createDataFrame(
[
("A1",date(2020, 1, 10), 1),
("A1",date(2020, 1, 12), 5),
("A1",date(2020, 1, 16), 3),
("A2",date(2020, 1, 25), 20),
("A2",date(2020, 1, 1), 12),
("A3",date(2020, 1, 31), 14),
("A3",date(2020, 1, 30), 12)
],
["id_2","date_2","value_2"]
)
)
winSpec = Window.partitionBy("value_1").orderBy("date_difference")
df3 = (
df1
.join(df2, [df1.id_1==df2.id_2,df1.date_1>=df2.date_2])
.withColumn("date_difference", datediff("date_1","date_2"))
.withColumn("dr", dense_rank().over(winSpec))
.where("dr=1")
.select(
col("id_1").alias("id"),
col("date_1").alias("date"),
col("value_1"),
col("value_2")
)
)
+---+----------+-------+-------+
|id |date |value_1|value_2|
+---+----------+-------+-------+
|A1 |2020-01-21|6 |3 |
|A1 |2020-01-15|5 |5 |
|A2 |2020-01-20|10 |12 |
|A3 |2020-02-21|12 |14 |
+---+----------+-------+-------+