Databricks Delta Live Tables - Apply Changes from delta table
Asked Answered
B

2

9

I am working with Databricks Delta Live Tables, but have some problems with upserting some tables upstream. I know it is quite a long text below, but I tried to describe my problem as clear as possible. Let me know if some parts are not clear.

I have the following tables and flow:

Landing_zone -> This is a folder in which JSON files are added that contain data of inserted or updated records. Raw_table -> This is the data in the JSON files but in table format. This table is in delta format. No transformations are done, except from transforming the JSON structure into a tabular structure (I did an explode and then creating columns from the JSON keys). Intermediate_table -> This is the raw_table, but with some extra columns (depending on other column values).

To go from my landing zone to the raw table I have the following Pyspark code:

cloudfile = {"cloudFiles.format":"JSON", 
                       "cloudFiles.schemaLocation": sourceschemalocation, 
                       "cloudFiles.inferColumnTypes": True}

@dlt.view('landing_view')
def inc_view():
    df = (spark
             .readStream
             .format('cloudFiles')
             .options(**cloudFilesOptions)
             .load(filpath_to_landing)
     <Some transformations to go from JSON to tabular (explode, ...)>
     return df

dlt.create_target_table('raw_table', 
                        table_properties = {'delta.enableChangeDataFeed': 'true'})
  
dlt.apply_changes(target='raw_table',
                  source='landing_view',
                  keys=['id'],
                  sequence_by='updated_at')

This code works as expected. I run it, add a changes.JSON file to the landing zone, rerun the pipeline and the upserts are correctly applied to the 'raw_table'

(However, each time a new parquet file with all the data is created in the delta folder, I would expect that only a parquet file with the inserted and updated rows was added? And that some information about the current version was kept in the delta logs? Not sure if this is relevant for my problem. I already changed the table_properties of the 'raw_table' to enableChangeDataFeed = true. The readStream for 'intermediate_table' then has option(readChangeFeed, 'true')).

Then I have the following code to go from my 'raw_table' to my 'intermediate_table':

@dlt.table(name='V_raw_table', table_properties={delta.enableChangeDataFeed': 'True'})
def raw_table():
     df = (spark.readStream
                .format('delta')
                .option('readChangeFeed', 'true')
                .table('LIVE.raw_table'))
     df = df.withColumn('ExtraCol', <Transformation>)
     return df
 ezeg
dlt.create_target_table('intermediate_table')
dlt.apply_changes(target='intermediate_table',
                  source='V_raw_table',
                  keys=['id'],
                  sequence_by='updated_at')

Unfortunately, when I run this, I get the error: 'Detected a data update (for example part-00000-7127bd29-6820-406c-a5a1-e76fc7126150-c000.snappy.parquet) in the source table at version 2. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.'

I checked in the 'ignoreChanges', but don't think this is what I want. I would expect that the autoloader would be able to detect the changes in the delta table and pass them through the flow.

I am aware that readStream only works with append, but that is why I would expect that after the 'raw_table' is updated, a new parquet file would be added to the delta folder with only the inserts and updates. This added parquet file is then detected by autoloader and could be used to apply the changes to the 'intermediate_table'.

Am I doing this the wrong way? Or am I overlooking something? Thanks in advance!

Bedwell answered 15/6, 2022 at 9:42 Comment(2)
When you run the pipeline, at what stage do you see the error? If your LIVE.raw_table has breaking updates, that is existing data files needed to be re-written, autoloader may not work.Damselfish
We have the same issue -- it would be good to get clarity on the appropriate handling of this use case.Seagraves
S
0

As readStream only works with appends, any change in the the source file will create issues downstream. The assumption that an update on "raw_table" will only insert a new parquet file is incorrect. Based on the settings like "optimized writes" or even without it, apply_changes can add or remove files. You can find this information in your "raw_table/_delta_log/xxx.json" under "numTargetFilesAdded" and "numTargetFilesRemoved".

Basically, "Databricks recommends you use Auto Loader to ingest only immutable files".

Schuller answered 23/9, 2022 at 14:5 Comment(1)
it's not related to the question... Author isn't reading changed files using cloudFiles, author tries to read data from change data feedAlphabetize
B
0

When you changed the settings to include the option '.option('readChangeFeed', 'true')', you should start with a full refresh(there is dropdown near start). Doing this will resolve the error 'Detected data update xxx', and your code should work for the incremental update.

Bela answered 20/10, 2022 at 23:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.