Auto increment id in delta table while inserting
Asked Answered
C

2

9

I have a problem regarding merging csv files using pysparkSQL with delta table. I managed to create upsert function that update if matched and insert if not matched.

I want to add column ID to the final delta table and increment it each time we insert data. This column identify each row in our delta table. Is there any way to put that in place ?

def Merge(dict1, dict2):
    res = {**dict1, **dict2}
    return res

def create_default_values_dict(correspondance_df,marketplace):
    dict_output = {}
    for field in get_nan_keys_values(get_mapping_dict(correspondance_df, marketplace)):
        dict_output[field] = 'null'
        # We want to increment the id row each time we perform an insertion (TODO TODO TODO)
#         if field == 'id':
#             dict_output['id'] = col('id')+1
#         else:    
    return dict_output


def create_matched_update_dict(mapping, products_table, updates_table):
    output = {}
    for k,v in mapping.items():
        if k == 'source_name':
            output['products.source_name'] = lit(v)
        else:
            output[products_table + '.' + k] = F.when(col(updates_table + '.' + v).isNull(), col(products_table + '.' + k)).when(col(updates_table + '.' + v).isNotNull(), col(updates_table + '.' + v))     
    return output    

insert_dict = create_not_matched_insert_dict(mapping, 'products', 'updates')
default_dict = create_default_values_dict(correspondance_df_products, 'Cdiscount')

insert_values = Merge(insert_dict, default_dict)
update_values = create_matched_update_dict(mapping, 'products', 'updates')

delta_table_products.alias('products').merge(
    updates_df_table.limit(20).alias('updates'),
    "products.barcode_ean == updates.ean") \
    .whenMatchedUpdate(set = update_values) \
    .whenNotMatchedInsert(values = insert_values)\
    .execute()

I tried to increment the column id in the function create_default_values_dict but it's seems to not working well, it doesn't auto increment by 1. Is there another way to solve this problem ? Thanks in advance :)

Cashmere answered 3/12, 2019 at 11:38 Comment(0)
S
7

Databricks has IDENTITY columns for hosted Spark

https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-create-table-using.html#parameters

GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY
 [ ( [ START WITH start ] [ INCREMENT BY step ] ) ] 

This works on Delta tables.

Example:

create table gen1 (
     id long GENERATED ALWAYS AS IDENTITY
   , t string
)

Requires Runtime version 10.4 or above.

Shady answered 1/3, 2022 at 15:39 Comment(6)
It is not working in databricks. Could you please lay down a exampleDulci
@vikasmadoori added example and minimum DBR version requirement.Shady
As per databricks, this identity may not workColbert
@SurenderRaja mind sharing a link?Shady
@Shady community.databricks.com/t5/data-engineering/…Colbert
@SurenderRaja it doesn't read this "may not work". This reads "Databricks Delta Lake does not guarantee consecutive identity values because of its distributed nature". Which is expected in distributed in nature. If look at sequence objects in other systems like Oracle, it also doesn't guarantee there will be no gaps in sequence numbers. Sorry if you meant something else.Shady
G
1

Delta does not support auto-increment column types.

In general, Spark doesn't use auto-increment IDs, instead favoring monotonically increasing IDs. See functions.monotonically_increasing_id().

If you want to achieve auto-increment behavior you will have to use multiple Delta operations, e.g., query the max value + add it to a row_number() column computed via a window function + then write. This is problematic for two reasons:

  1. Unless you introduce an external locking mechanism or some other way to ensure that no updates to the table happen in-between finding the max value and writing, you can end up with invalid data.

  2. Using row_number() will reduce parallelism to 1, forcing all the data through a single core, which will be very slow with large data.

Bottom line, you really do not want to use auto-increment columns with Spark.

Hope this helps.

Gamophyllous answered 21/1, 2020 at 7:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.