How to drop duplicates in Delta Table?
Asked Answered
C

6

10

there is a function to delete data from a Delta Table:

deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete(col("date") < "2017-01-01")

But is there also a way to drop duplicates somehow? Like deltaTable.dropDuplicates()...

I don't want to read the whole table as a dataframe, drop the duplicates, and rewrite it to the storage again

Cytochrome answered 8/5, 2020 at 7:48 Comment(1)
May eb of help: docs.delta.io/latest/delta-update.htmlLeolaleoline
G
10

If you have a primary key (like UUID), and the duplicates are based on a certain combination of columns (e.g., Col1, Col2, Col3), then you can use the ROW_NUMBER() approach to get a list of UUIDs of the duplicate rows to delete. As a side note, Delta tables don't have ROWID or sequences to auto-generate primary keys at this time.

If your duplicates are based on a certain composite key (e.g., Col2, Col4, Col7), the ROW_NUMBER() trick to get rid of the duplicates will not work: it will delete all copies of the row. For this scenario, you can get rid of the duplicates in a Delta table using a combination of Delta MERGE and Delta Time Travel (versioning) features. Here are the steps to eliminate the full duplicates (the rows where all the corresponding fields have identical values):

  1. Get a dataframe with the distinct rows that have duplicates in the Delta table. ROW_NUMBER() function will help you here.
  2. Use MERGE operation and WHEN MATCHED DELETE to remove these rows. Notice that if you have 4 copies of the same row, it will delete all 4 copies. We will re-add non-duplicates in Step 5.
  3. Use DESCRIBE HISTORY command to get the version number of the Delta table before the current. That version still has all the duplicates, while the current version doesn't.
  4. Repeat Step 1, but this time use VERSION AS OF option to get the dataframe with the distinct rows we deleted in Step 2.
  5. Use MEREGE operation and WHEN NOT MATCHED INSERT ALL to add the distinct rows that used to have duplicates.

Here is a Python example that I tested on Azure Databricks with a Delta table stored in ADLS.

from delta.tables import *

# Step 1
dfTemp = (
  spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://[email protected]/my-path/my-delta-table`")
           ).filter(f.col('rn') > 1).drop('rn').distinct()

# Step 2
deltaTemp = DeltaTable.forPath(spark, "abfss://[email protected]/my-path/my-delta-table")

deltaTemp.alias("main").merge(
    dfTemp.alias("nodups"),
    "main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenMatchedDelete().execute()

Now, get the version number of the Delta table before the merge we performed in Step 2 above. I ran this command as SQL in my Azure Databricks notebook:

%sql

-- Step 3
DESCRIBE HISTORY delta.`abfss://[email protected]/my-path/my-delta-table`

Let's say, my current Delta table version is 50, and my previous version (with the duplicates) was 49. You can now execute the remaining steps below:

# Step 4
dfTemp = (
  spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://[email protected]/my-path/my-delta-table` VERSION AS OF 49")
           ).filter(f.col('rn') > 1).drop('rn').distinct()

# Step 5
deltaTemp = DeltaTable.forPath(spark, "abfss://[email protected]/my-path/my-delta-table")

deltaTemp.alias("main").merge(
    dfTemp.alias("nodups"),
    "main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenNotMatchedInsertAll().execute()

When you have to deal with partial duplicates (the rows where the primary keys are the same, but some other corresponding fields have different values), you will probably end up coding the logic for "marking" which one of the duplicate rows to keep. The logic will fully depend on your particular use case.

Giacomo answered 5/10, 2021 at 18:2 Comment(0)
E
6

Yes you can delete duplicates directly from delta table. Using merge command. Below command will retain only latest records and rest redundant data is deleted.

MERGE into [deltatable] as target
USING ( select *, ROW_NUMBER() OVER (Partition By [primary keys] Order By [date] desc) as rn  from [deltatable]) t1 qualify rn> 1 ) as source
ON [merge primary keys and date column between source and target]
WHEN MATCHED THEN DELETE

There are other multiple ways as well but they are time consuming.

Example:

MERGE into delta_table as target
USING ( select *, ROW_NUMBER() OVER (Partition By pk1 Order By date1 desc) as rn  from delta_table ) t1 qualify rn> 1 ) as source
ON source.pk1 = target.pk1 and source.date1 = target.date1
WHEN MATCHED THEN DELETE
Endowment answered 30/12, 2020 at 7:4 Comment(3)
Hey Veeresh, Can you add an example for the above answer.It will help to understand how we can use itLeckie
For anyone who doesn't want to lose their mind - this doesnt work on Databricks as of today. It won't pick up the rn alias. Maybe I'm just dumb. But I couldn't get this setup to workEnforce
I have edited the code as per recent updates. @Enforce check it now, you don't have to lose your mind for a piece of code.Endowment
M
0

we assume the primary key is columns a,b,c , and the timestamp column is d, and we gonna choose the latest version of the data

MERGE into delta.`/mnt/lei/dupTab` as target
USING (
      with t as(
      select a,b,c, d,ROW_NUMBER() OVER (Partition By a,b,c Order By d desc) as rn  from delta.`/mnt/lei/dupTab`
      )
      select * from t where rn > 1
 
) 
as source
ON source.a=target.a and source.b=target.b and source.c=target.c and source.d=target.d
WHEN MATCHED THEN DELETE
Muriate answered 20/1, 2022 at 2:8 Comment(1)
the question is primarily about existing data, not new data comingRhetorical
A
0

Not the most efficient but simplest solution.

  1. Read subset of rows from Delta Table that may contain duplicates, drop duplicates and store it somewhere for now.
import pyspark.sql.functions as F
from delta.tables import DeltaTable

df = spark.read.format("delta").load("schema.table_name")

# let's limit only to records that may contain duplicates
# for instance: all records that have user_id is NULL
df = df.filter(F.isnull('user_id'))

# provide a subset of columns to identify duplicates
df = df.dropDuplicates(["birthday", "name"])

# let's store this somewhere, e.g. s3 folder
df.write.parquet("s3://bucket/temp_folder")
  1. Drop full subset of records that contain duplicates from Delta Table (sql code):
DELETE FROM schema.table_name WHERE user_id IS NULL
  1. Merge cached data back into Delta Table:
# our cached records are on s3, so:
df = spark.read.parquet("s3://bucket/temp_folder")

# merge into Delta Table providing a simple merge condition:
delta_table = DeltaTable.forName(spark, "schema.table_name")
delta_table.alias("t").merge(
    df.alias("b"),
    "b.user_id = t.user_id AND b.birthday = t.birthday AND b.name = t.name"
).whenNotMatchedInsertAll().execute()

Hope this gives some ideas. Probably could be simplified and cached just in memory instead of S3, I'm not that fluent with Spark yet.

Abdominous answered 30/12, 2022 at 14:38 Comment(0)
I
0

It's possible, given this is Databricks, the method is simpler.

CREATE TABLE UnDuped AS
Select DISTINCT * FROM Table

This doesn't work with a normal SQL Server query. Databricks uses parquet tables which is columnar approach thus making the distinction between duplicates differently than an RBAR (row by agonizing row) fashion found in SQL Server. I gave credit to Vereesh but if it is this simple, it may be worth going as above rather than using a row_number() function or a "When matched delete". But his method provides alternates that look good. Alternatively

with cte as(
Select col1,col2,col3,etc
,row_number()over(partition by col1,col2,col3,etc order by col1)rowno 
from table) 
Delete from cte where rowno>1

Make sure every column is included in the row_number() partition and it will find the dupes, delete the ones with a value of rowno greater than one and presto, they are gone. More important here is the order by. For example, if you want the last file entered as a dupe to be the item removed, you can drop the date out of the partition and order by date which will give you all the dupes and order them by date (or date desc in this case) The rowno will point to the latest date and the rest will go away.

Imam answered 1/2, 2023 at 20:25 Comment(1)
I like this on the read side to provide some temporary mitigation, but fixing the data is what everyone using my tables really wants from me. +1 for being relevant and useful!Pike
B
0

I write a python function to dedup a delta table. It uses spark sql.

def dedup_delta_table(table='mytable',pks=['id','line'],orderby='order by update_date desc', db='database',userdb='userdb'):
    ### given a table remove rows with duplicated primary key
    #table, string, table name
    #pks, primary keys  or unqiue keys of the table,
    #orderby: sql order by clause. To dedup, for rows with same primary keys , after order by specified column(s) , take first row
    #db,database where the table resides 
    #userdb,a database to save the dedup rows temporarily

    spark.sql(f"""use    {db}  """)

    ###sql query parts
    groupby_str=','.join(pks)
    joinOn_str=' AND '.join([f'a.{col} = b.{col}' for col in pks])
    allcols=spark.sql(f"""select *   from  {table}   limit 1""").columns
    allcols_str=','.join(allcols)

    ###1.extract dup rows and dedup them:  deduped rows
    spark.sql(f"""
    select {allcols_str}
    from(
        select *
        ,row_number()over(partition by {groupby_str} {orderby} ) rn   
        ,count(*)over(partition by {groupby_str}) cnt
        from   {table} 
        )c
    where cnt>=2 and rn=1 
    """).write.saveAsTable(f"{userdb}.{table}_dedup_rows_tb",mode='Overwrite')
    
    ##2.delete all dup rows by merge
    spark.sql(f"""
    MERGE INTO {table} a
    USING {userdb}.{table}_dedup_rows_tb b
    on {joinOn_str}
    WHEN MATCHED THEN DELETE
    """).toPandas()

    ##3.insert deduped rows
    spark.sql(f"""
    INSERT INTO {table}
    SELECT * 
    from {userdb}.{table}_dedup_rows_tb
    """)
    #drop the temporary table
    spark.sql(f"""drop table if exists   {userdb}.{table}_dedup_rows_tb    """)

#### example
spark.sql(f"""use  userdb  """)
spark.sql("""CREATE TABLE  my_delta_table (
  id INT,
  line INT,
  update_date TIMESTAMP
) USING DELTA; """)
spark.sql("""INSERT INTO my_delta_table VALUES (1111, 1, '2024-01-29'),
                                    (2222, 2, '2024-01-30'),
                                    (2222, 2, '2024-01-30');""")
spark.sql("""SELECT * FROM my_delta_table""").toPandas()
dedup_delta_table(table='my_delta_table',pks=['id','line'],orderby='order by update_date desc', db='userdb',userdb='userdb')
Bedivere answered 30/1 at 16:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.