dag import error : AttributeError: '_TaskDecorator' object has no attribute 'update_relative'
Asked Answered
K

1

5

I'm facing an issue which my dag cannot be imported, but cannot figure out why:

from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task,dag

@dag(
dag_id = "database_monitor",
schedule_interval = '*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21,0,tz="UTC"),
catchup=False,)
def Pipeline():

    check_db_alive = SqlSensor(
        task_id="check_db_alive",
        conn_id="evergreen",
        sql="SELECT pg_is_in_recovery()",
        success= lambda x: x == False,
        poke_interval= 60,
        #timeout = 60 * 2,
        mode = "reschedule",
    )


    @task()
    def alert_of_db_inrecovery():
        import requests
        # result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"

        data = {"@key":"kkll",
                "@version" : "alertapi-0.1",
                "@type":"ALERT",
                "object" : "Testobject",
                "severity" : "MINOR",
                "text" : str("Former primary instance is in recovery")
            }
        requests.post('https://httpevents.systems/api/sendAlert',verify=False,data=data)


    check_db_alive >> alert_of_db_inrecovery


dag = Pipeline()

I get this error:

AttributeError: '_TaskDecorator' object has no attribute 'update_relative'

Krems answered 18/7, 2023 at 7:56 Comment(0)
B
11

You need to call the Python task flow operator i.e

change check_db_alive >> alert_of_db_inrecovery to check_db_alive >> alert_of_db_inrecovery()

check correct code

from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task, dag


@dag(
    dag_id="database_monitor",
    schedule_interval='*/10 * * * *',
    start_date=pendulum.datetime(2023, 7, 16, 21, 0, tz="UTC"),
    catchup=False,
)
def Pipeline():
    check_db_alive = SqlSensor(
        task_id="check_db_alive",
        conn_id="evergreen",
        sql="SELECT pg_is_in_recovery()",
        success=lambda x: x == False,
        poke_interval=60,
        # timeout = 60 * 2,
        mode="reschedule",
    )

    @task
    def alert_of_db_inrecovery():
        import requests
        # result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"

        data = {"@key": "kkll",
            "@version": "alertapi-0.1",
            "@type": "ALERT",
            "object": "Testobject",
            "severity": "MINOR",
            "text": str("Former primary instance is in recovery")
            }
        requests.post('https://httpevents.systems/api/sendAlert', verify=False, data=data)

    check_db_alive >> alert_of_db_inrecovery()


dag = Pipeline()

Ref: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

Burletta answered 18/7, 2023 at 8:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.