If you have a primary key (like UUID), and the duplicates are based on a certain combination of columns (e.g., Col1, Col2, Col3), then you can use the ROW_NUMBER() approach to get a list of UUIDs of the duplicate rows to delete. As a side note, Delta tables don't have ROWID or sequences to auto-generate primary keys at this time.
If your duplicates are based on a certain composite key (e.g., Col2, Col4, Col7), the ROW_NUMBER() trick to get rid of the duplicates will not work: it will delete all copies of the row. For this scenario, you can get rid of the duplicates in a Delta table using a combination of Delta MERGE and Delta Time Travel (versioning) features. Here are the steps to eliminate the full duplicates (the rows where all the corresponding fields have identical values):
- Get a dataframe with the distinct rows that have duplicates in the Delta table. ROW_NUMBER() function will help you here.
- Use MERGE operation and WHEN MATCHED DELETE to remove these rows. Notice that if you have 4 copies of the same row, it will delete all 4 copies. We will re-add non-duplicates in Step 5.
- Use DESCRIBE HISTORY command to get the version number of the Delta table before the current. That version still has all the duplicates, while the current version doesn't.
- Repeat Step 1, but this time use VERSION AS OF option to get the dataframe with the distinct rows we deleted in Step 2.
- Use MEREGE operation and WHEN NOT MATCHED INSERT ALL to add the distinct rows that used to have duplicates.
Here is a Python example that I tested on Azure Databricks with a Delta table stored in ADLS.
from delta.tables import *
# Step 1
dfTemp = (
spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://[email protected]/my-path/my-delta-table`")
).filter(f.col('rn') > 1).drop('rn').distinct()
# Step 2
deltaTemp = DeltaTable.forPath(spark, "abfss://[email protected]/my-path/my-delta-table")
deltaTemp.alias("main").merge(
dfTemp.alias("nodups"),
"main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenMatchedDelete().execute()
Now, get the version number of the Delta table before the merge we performed in Step 2 above. I ran this command as SQL in my Azure Databricks notebook:
%sql
-- Step 3
DESCRIBE HISTORY delta.`abfss://[email protected]/my-path/my-delta-table`
Let's say, my current Delta table version is 50, and my previous version (with the duplicates) was 49. You can now execute the remaining steps below:
# Step 4
dfTemp = (
spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://[email protected]/my-path/my-delta-table` VERSION AS OF 49")
).filter(f.col('rn') > 1).drop('rn').distinct()
# Step 5
deltaTemp = DeltaTable.forPath(spark, "abfss://[email protected]/my-path/my-delta-table")
deltaTemp.alias("main").merge(
dfTemp.alias("nodups"),
"main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenNotMatchedInsertAll().execute()
When you have to deal with partial duplicates (the rows where the primary keys are the same, but some other corresponding fields have different values), you will probably end up coding the logic for "marking" which one of the duplicate rows to keep. The logic will fully depend on your particular use case.