I'm facing an issue which my dag cannot be imported, but cannot figure out why:
from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task,dag
@dag(
dag_id = "database_monitor",
schedule_interval = '*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21,0,tz="UTC"),
catchup=False,)
def Pipeline():
check_db_alive = SqlSensor(
task_id="check_db_alive",
conn_id="evergreen",
sql="SELECT pg_is_in_recovery()",
success= lambda x: x == False,
poke_interval= 60,
#timeout = 60 * 2,
mode = "reschedule",
)
@task()
def alert_of_db_inrecovery():
import requests
# result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
data = {"@key":"kkll",
"@version" : "alertapi-0.1",
"@type":"ALERT",
"object" : "Testobject",
"severity" : "MINOR",
"text" : str("Former primary instance is in recovery")
}
requests.post('https://httpevents.systems/api/sendAlert',verify=False,data=data)
check_db_alive >> alert_of_db_inrecovery
dag = Pipeline()
I get this error:
AttributeError: '_TaskDecorator' object has no attribute 'update_relative'