I have a problem regarding merging csv files using pysparkSQL with delta table. I managed to create upsert function that update if matched and insert if not matched.
I want to add column ID
to the final delta table and increment it each time we insert data. This column identify each row in our delta table. Is there any way to put that in place ?
def Merge(dict1, dict2):
res = {**dict1, **dict2}
return res
def create_default_values_dict(correspondance_df,marketplace):
dict_output = {}
for field in get_nan_keys_values(get_mapping_dict(correspondance_df, marketplace)):
dict_output[field] = 'null'
# We want to increment the id row each time we perform an insertion (TODO TODO TODO)
# if field == 'id':
# dict_output['id'] = col('id')+1
# else:
return dict_output
def create_matched_update_dict(mapping, products_table, updates_table):
output = {}
for k,v in mapping.items():
if k == 'source_name':
output['products.source_name'] = lit(v)
else:
output[products_table + '.' + k] = F.when(col(updates_table + '.' + v).isNull(), col(products_table + '.' + k)).when(col(updates_table + '.' + v).isNotNull(), col(updates_table + '.' + v))
return output
insert_dict = create_not_matched_insert_dict(mapping, 'products', 'updates')
default_dict = create_default_values_dict(correspondance_df_products, 'Cdiscount')
insert_values = Merge(insert_dict, default_dict)
update_values = create_matched_update_dict(mapping, 'products', 'updates')
delta_table_products.alias('products').merge(
updates_df_table.limit(20).alias('updates'),
"products.barcode_ean == updates.ean") \
.whenMatchedUpdate(set = update_values) \
.whenNotMatchedInsert(values = insert_values)\
.execute()
I tried to increment the column id
in the function create_default_values_dict
but it's seems to not working well, it doesn't auto increment by 1. Is there another way to solve this problem ? Thanks in advance :)