Querying with dbt from external source
Asked Answered
B

1

6

I have the following issue:

  • I have an AWS S3 pipeline that on a daily basis a single json.gz files is spit.
  • I wish to take that file with dbt and put it into snowflake (no snowpipe use atm)

I have managed to do this by creating a storage integration and I have manually created with my role (used for running dbt) a schema and assing usage on that schema. So far so good.

Then I read about this:

https://github.com/fishtown-analytics/dbt-external-tables

Problem is that this is the only way this runs properly, I had to alter my dbt profiles.yml, set the default schema to be S3_MIXPANEL with default database RAW_DEV, run a different target and role on that with --target 'ingest_dev' parameter.

I keep thinking that there should be a more sophisticated solution, where I can create schema's and query metadata and use something like {{ source() }} so I can point my documentation somehow that this is an external source. This dbt-external-tables is not really well explained for my case here I think?

Please can anyone help me and share how to create schemas and query from external stages properly without changing default schema macro & dbtprofiles.yml each time?

I have succeeded to run the following code:

{{
  config(
    materialized ='incremental',
    schema = generate_schema_name('S3_MIXPANEL')
  )
}}
 
  SELECT
    metadata$filename as file_name,
    to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
    $1 as payload,
    CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz($1:properties:mp_processing_time_ms::int / 1000)) as  event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at

 from

    @my_s3_stage

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date>(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }} 

EDIT 22-06-20:

I have added the src_mixpanel.yml file in my models and ran the dbt command, however I had to also specify the data_types, so I added them too, then I apparently had to add the "macro" in my macros too (btw maybe a stupid question but I don't really know how to install your package, so I manually added all macros from yours into mine).

Now when I run this code:

dbt run-operation stage_external_sources

with

version: 2

sources:

  - name: s3_mixpanel
    database: RAW_DEV
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
              data_type: date
            - name: file_name
              expression: metadata$filename
              data_type: string
          columns:
            - name: properties
              data_type: variant

I get an error:

Encountered an error while running operation: Compilation Error in macro stage_external_sources (macros/stage_external_sources.sql)
'dict object' has no attribute 'sources'

Bashkir answered 20/7, 2020 at 9:19 Comment(0)
J
12

As the maintainer of the dbt-external-tables package, I'll share its opinionated view. The package believes that you should stage all external sources (S3 files) as external tables or with snowpipes first, in a process that includes as little confounding logic as possible. Then you can select from them, as sources, in dbt models, alongside all requisite business logic.

If my understanding is correct, you would stage your mixpanel data as below, in a file called (e.g.) models/staging/mixpanel/src_mixpanel.yml:

version: 2

sources:

  - name: s3_mixpanel
    database: raw_dev
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          file_format: "( type = json )"  # or a named file format
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
          columns:
            - name: properties
              data_type: variant

You would run this macro from the package to create the external table—and, after creation, to update its partition metadata if you don't have auto_refresh enabled (see Snowflake docs):

dbt run-operation stage_external_sources

You can then select from this source in an incremental model, like the one you have above. Now, event_date is a partition column on this external table, so filtering on it should enable Snowflake to prune files (though that's been inconsistent historically for dynamic, subquery-derived filters).

{{
  config(
    materialized ='incremental'
  )
}}
 
  SELECT
    metadata$filename as file_name,
    event_date,
    value as payload,
    properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at

 from {{ source('s3_mixpanel', 'events' }} 

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date >(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }}
Juridical answered 20/7, 2020 at 13:6 Comment(11)
Hey, I tried but did not succeeded, editted my initial post with more dataBashkir
Check out the docs on installing packages here! As far as the error you're seeing: what version of dbt are you using?Juridical
I am using version 0.16.1Bashkir
Got it—you'd need to use dbt-external-tables v0.2.0. As of v0.3.0, the package requires v0.17.0 due to a change in the way sources are recorded in the graph (hence the error message you're seeing).Juridical
I have added 0.3.2 from git, and upgraded dbt to 0.17.2. I get now: Running with dbt=0.17.2 13:01:32 + 1 of 1 START external source s3_mixpanel.events 13:01:33 + 1 of 1 (1) create or replace external table RAW_DEV.s3_mixpan... Encountered an error while running operation: Database Error 002003 (02000): SQL compilation error: File format 'NONE' does not exist or not authorized.Bashkir
That's progress! You need to specify a file_format for the macro to use when creating Snowflake external tables. I edited my answer above to include file_format in the YML config.Juridical
I have success but I see only VALUE and EVENT_DATE, I wish to "break down" this external table to file_name too LOL, sorry for the late reply but I got a "success" root@54c0239ce4f6:/app# dbt run-operation stage_external_sources --target 'ingest_dev' Running with dbt=0.17.2 09:15:35 + 1 of 1 START external source s3_mixpanel.events 09:15:36 + 1 of 1 (1) create or replace external table RAW_DEV.s3_mixpan... 09:15:38 + 1 of 1 (1) SUCCESS 1 now what? I can replace the refference from my script from @stage_name with {{source()}} ? THANK YOU! Bashkir
another stupid question but how do I refresh the data? Do I need to run everyday this dbt-external-tables as part of my daily dbt schedule to update columns? I dont want to have the S3 SQS service, I am using airflow to run dbt dagsBashkir
Glad it worked! You can select the pseudocolumn metadata$filename from any Snowflake external table to see the name of the actual source file.Juridical
And yes. Since you don't have auto_refresh enabled (that would be the S3 SQS approach), Snowflake needs you to run alter external table ... refresh to update the external table's metadata (docs). The stage_external_sources operation takes care of running that for you. If you need to update the columns in the external table definition, you'll want to "full refresh" it: dbt run-operation stage_external_sources --var 'ext_full_refresh: true'Juridical
In the partitions args, we should pass a date_type.Inly

© 2022 - 2024 — McMap. All rights reserved.