You need to have only a destination table as Delta table. The data that you're planning to merge into is not required to be a Delta table. It's really depends on what API you're using:
- If you're using Python API, then you can just use dataframe as is (example is based on docs):
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events/")
updatesDF = .... # your transformed dataframe
deltaTable.alias("target").merge(
updatesDF.alias("updates"),
"target.col1 = updates.col1") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
- If you're using SQL MERGE command - you can just register the temp view for your dataframe, and use it as input into the MERGE SQL command:
updates_df.createOrReplaceTempView(updates)
merge_sql = f"""
merge into target
using updates
ON source.col1 == target.col1
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
updates_df._jdf.sparkSession().sql(merge_sql)
The only catch here is that you need to use df._jdf.sparkSession().sql
to execute the SQL command in the same context where you have registered the temp view.