Airflow xcom pull only returns string
Asked Answered
C

2

5

I have an airflow pipeline where I need to get a filename from a pubsub subscription and then import that file into a cloud sql instance. I use the CloudSqlInstanceImportOperator to import the CSV file. This operator needs a body, which contains the filename and other parameters. Since I read that filename during runtime, I also have to define the body during runtime. This all works. But when I pull the body from xcom, it returns a string instead of a python dictionary. So the CloudSqlInstanceImportOperator gives me the following error (my guess is, because the body is a string and not a dictionary):

Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 984, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 715, in execut
    self._validate_body_fields(
  File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 712, in _validate_body_field
    api_version=self.api_version).validate(self.body
  File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 420, in validat
    dictionary_to_validate=body_to_validate
  File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 341, in _validate_fiel
    value = dictionary_to_validate.get(field_name
AttributeError: 'str' object has no attribute 'get

This is the code I use:

import json 
import os
from datetime import datetime, timedelta
import ast
from airflow import DAG
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceImportOperator


def create_dag(dag_id,default_args):
    BUCKET = "{{ var.value.gp2pg_bucket }}"
    GCP_PROJECT_ID = "{{ var.value.gp2pg_project_id }}"
    INSTANCE_NAME = "{{ var.value.gp2pg_instance_name }}"

    def define_import_body(file,**kwargs):
        import_body = {
            "importContext": {
                "importUser": "databasename",
                "database": "databaseuser",
                "fileType": "csv",
                "uri": "bucketname" + file,
                "csvImportOptions": {
                    "table": "schema.tablename",
                    "columns": ["columns1",
                                "column2"]}
            }
        }
        task_instance = kwargs['task_instance']
        task_instance.xcom_push(key='import_body', value=import_body)
        print(import_body)

    def get_filename(var,**kwargs):
        message = ast.literal_eval(var)
        file = message[0].get('message').get('attributes').get('objectId')
        task_instance = kwargs['task_instance']
        task_instance.xcom_push(key='filename', value=file)
        print(file)

    dag = DAG(dag_id=dag_id, schedule_interval=None, default_args=default_args)

    with dag:
        t1 = PubSubPullSensor(task_id='pull-messages',
                              project="projectname",
                              ack_messages=True,
                              max_messages=1,
                              subscription="subscribtionname")


        message = "{{ task_instance.xcom_pull() }}"

        t2 = PythonOperator(
            task_id='get_filename',
            python_callable=get_filename,
            op_kwargs={'var': message},
            provide_context=True,
        )

        file = "{{ task_instance.xcom_pull(task_ids='get_filename', key='filename') }}"

        t3 = PythonOperator(
            task_id='define_import_body',
            python_callable=define_import_body,
            op_kwargs={'file': file},
            provide_context=True,
        )

        import_body = "{{ task_instance.xcom_pull(task_ids='define_import_body', key='import_body') }}"

        t4 = CloudSqlInstanceImportOperator(
            project_id=GCP_PROJECT_ID,
            body= import_body,
            instance=INSTANCE_NAME,
            gcp_conn_id='postgres_default',
            task_id='sql_import_task',
            validate_body=True,
        )

        t5 = GoogleCloudStorageToGoogleCloudStorageOperator(
            task_id='copy_files',
            source_bucket=BUCKET,
            source_object=file,
            destination_bucket=BUCKET,
            destination_object='processed/import/'+file, )

        t1 >> t2 >> t3 >> t4 >> t5

    return dag


dags_folder = os.getenv('DAGS_FOLDER', "./dags")
flow_config = open(f'{dags_folder}/gp2pg/flow_config.json', 'r').read()
for key, values in json.loads(flow_config).items():
    default_args = {
        "owner": "owner",
        "start_date": datetime(2020, 1, 1),
        "email": [],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 0,
        "retry_delay": timedelta(minutes=5),
    }

    dag_id = f"gp2pg_{key}_data_to_pg"

    globals()[dag_id] = create_dag(dag_id, default_args)

Any idea how I could solve that problem?

Cavity answered 18/11, 2020 at 15:1 Comment(0)
B
15

EDIT:

For Airflow >= 2.1.0: Airflow added the ability to render fields as native Python objects. You need to set render_template_as_native_obj=True in your DAG constructor. You can follow this documentation example.

Original Answer:

First CloudSqlInstanceImportOperator is deprecated. You should use CloudSQLImportInstanceOperator from providers

The body param needs to be dict as explained in the docs.

XCOM is a table in the database. The data is saved as strings. You can't store dict in database as dict is a Python in memory object. You probably have a Json (string). Try convert it to dict:

body=json.loads(import_body) 

EDIT: (after discussion in comments)

You will need to wrap your operator with PythonOperator so you can convert the xcom to dict and use it.

def my_func(ds, **kwargs):
    ti = kwargs['ti']
    body = ti.xcom_pull(task_ids='privious_task_id')
    import_body = json.loads(body)
    op = CloudSqlInstanceImportOperator(
            project_id=GCP_PROJECT_ID,
            body=import_body,
            instance=INSTANCE_NAME,
            gcp_conn_id='postgres_default',
            task_id='sql_import_task',
            validate_body=True,
        )
    op.execute()
    

p = PythonOperator(task_id='python_task', python_callable=my_func)

Noting that calling operator inside operator is not the best practice (reason is explained in this answer) you can avoid it all together for Airflow>=2.1 as explained in the first paragraph.

Beaulahbeaulieu answered 18/11, 2020 at 15:16 Comment(7)
Hmm I have the newest version of apache-airflow in my python environment but cannot find the providers folder in it.. body = json.loads(import_body) does not work. Now the whole pipeline fails. I also would have to cast the import_body in the CloudSqlInstanceImportOperator since it has to be cast during runtime.Cavity
providers is from Airflow 2.0 to use it you will need to install it like any python package. see the link I added to pypi.org/project/apache-airflow-backport-providers-googleBeaulahbeaulieu
ah my bad, did not see the link to the providers pip page.Cavity
Unfortunately not. Since the xcom value will be pulled during runtime, I cannot cast the value directly in the code, I would have to cast it in the operator. And when I try that, the whole pipelines fails with an error.Cavity
what error? You can change the pipe as follows: add PythonOperator. In the python callable pull the xcom. convert it to dict and then setup op = CloudSqlInstanceImportOperator and call op.execute() .Basically wrap the CloudSql actions with PythonOperator.Beaulahbeaulieu
render_template_as_native_obj is a very cool looking feature, unfortunately and as too often with Airflow, this doesn't work (airflow 2.4.3). Fields are still rendered as string, not python objects even though render_template_as_native_obj is set to True.Memnon
@Memnon not sure whar you mean by that. Works fine for me and many others. If you have an issue I suggest open a question and ask about your specific case.Beaulahbeaulieu
H
4

As of Airflow 2.1 (to be released soon), you can pass render_template_as_native_obj=True to the dag and Airflow will return the Python type (dict, int, etc) instead of string. No other code changes are needed. See this pull request

dag = DAG(
    dag_id="example_template_as_python_object",
    schedule_interval=None,
    start_date=days_ago(2),
    render_template_as_native_obj=True,
)
Hurling answered 16/5, 2021 at 22:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.