Using BigQueryInsertJobOperator instead of BigQueryExecuteQueryOperator
Asked Answered
S

4

11

Recently updated my Airflow and saw BigQueryExecuteQueryOperator has been deprecated in favor of BigQueryInsertJobOperator. The documentation seemed rather vague, linking to REST Resource: jobs (and Method: jobs.query). In particular, it's unclear to me whether there's anywhere I can specify write_disposition, destination_dataset_table, etc. I want to make sure I'm not making things overly complicated.

Where I currently do

# my.sql
SELECT * FROM `proj.ds.table_1`
---------------------------------
# my-dag.py
BigQueryExecuteQueryOperator(
    task_id='copy-table-1',
    sql = 'my.sql',
    destination_dataset_table='proj:ds.table_2',
    write_disposition='WRITE_EMPTY',
    dag=dag
)

do I now need to use DDL statements like

# my.sql
CREATE TABLE IF NOT EXISTS
ds.table_2
AS (
  SELECT * FROM `proj.ds.table_1`
)
---------------------------------
# my-dag.py
BigQueryInsertJobOperator(
    task_id='copy-table-1',
    configuration={
        'query': my.sql
    },
    dag=dag
)

Not to mention passing parameters to the query in the format suggested by the QueryParameter docs instead of just params={ 'table': THE_TABLE }...

Sitin answered 31/3, 2021 at 21:6 Comment(2)
I'm also having some issues understanding the new Operator but I asked in the Airflow channel and they pointed me to the example dag that shows this: github.com/apache/airflow/tree/…Mintun
Google API is not a low hanging fruit. Hope the following would be of any help: cloud.google.com/bigquery/docs/reference/rest/v2/… cloud.google.com/bigquery/docs/reference/rest/v2/…Scornik
S
9

This is the API documentation to follow for the BigQueryInsertJobOperator: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery.

Rather than mess with Google's parameterized SQL queries, I'm using this method to get templated queries to write to BQ:

# my.sql
SELECT * FROM `{PROJECT}.{DATASET}.{TBL_TO_MOVE}`
---------------------------------
# my-dag.py
PROJECT = 'my-project'
DATASET = 'my-dataset'
TBL_TO_MOVE = 'some-table'

DESTINATION_DS = 'other-dataset'
DESTINATION_TBL = 'other-table'

BigQueryInsertJobOperator(
    task_id='copy-table-1',
    configuration={
        'query': {
            'query': open('my.sql', 'r').read().format(**locals()),
            'destinationTable': {
                'projectId': PROJECT,
                'datasetId': DESTINATION_DS,
                'tableId': DESTINATION_TBL
            },
            'useLegacySql': False,
            'allowLargeResults': True,
        }
    },
    dag=dag
)

The open('my.sql', ...) reads the SQL file then uses local variables in place of the brace variables (e.g. {PROJECT} gets replaced by my-project).

Sitin answered 24/5, 2021 at 23:9 Comment(0)
S
3

I used the same method that @zack did. However, I used the jinja format for finding the SQL file.

"query": "{% include 'path/to/file.sql' %}"

Searle answered 17/10, 2022 at 8:19 Comment(0)
R
1

To add more to the conversation, adding a query as a file is even easier

BigQueryInsertJobOperator(
    task_id='create_table',
    configuration={
        'query': {
            'query': 'path/to/file.sql',
            'useLegacySql': False,
        }
    },
)
Reborn answered 12/9, 2023 at 10:35 Comment(0)
A
0

As an addition here's the Airflow documentation on examples of BigQueryInsertJobOperator:

https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/bigquery.html#howto-operator-bigqueryinsertjoboperator

Aleydis answered 3/11, 2023 at 9:27 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.