Airflow failed slack message
Asked Answered
E

6

5

How can I configure Airflow so that any failure in the DAG will (immediately) result in a slack message?

At this moment I manage it by creating a slack_failed_task:

slack_failed_task =  SlackAPIPostOperator(
    task_id='slack_failed',
    channel="#datalabs",
    trigger_rule='one_failed',
    token="...",
    text = ':red_circle: DAG Failed',
    icon_url = 'http://airbnb.io/img/projects/airflow3.png',
    dag=dag)

And set this task (one_failed) upstream from each other task in the DAG:

slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e

It works, but it's error prone since forgetting to add the task will skip the slack notifications and seems like a lot of work.

Is there perhaps a way to expand on the email_on_failure property in the DAG?

Bonus ;-) for including a way to pass the name of the failed task to the message.

Exasperate answered 16/6, 2017 at 9:58 Comment(2)
Hey Tom, I noticed at the bottom you put "Is there perhaps a way to expand on the email_on_failure property in the DAG?" - I do not know if you are still interested, but you may benefit from reading my post on how I have configured my airflow settings for emails on DAG failures using Amazon SES and the email_on_failure property. SO Post Here. Thanks!Pontifical
Thanks, but I just wanted Slack messages to begin with :-) I thought the email_on_failure could be a sink to send messages to that then are either mailed or slacked or ...Exasperate
B
20

Maybe this example will be helpful:

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
Brendis answered 27/6, 2017 at 18:40 Comment(0)
S
5

Try the new SlackWebhookOperator which is there in Airflow version>=1.10.0

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_msg="Hi Wssup?"

slack_test =  SlackWebhookOperator(
    task_id='slack_test',
    http_conn_id='slack_connection',
    webhook_token='/1234/abcd',
    message=slack_msg,
    channel='#airflow_updates',
    username='airflow_'+os.environ['ENVIRONMENT'],
    icon_emoji=None,
    link_names=False,
    dag=dag)

Note: Make sure you have slack_connection added in your Airflow connections as

host=https://hooks.slack.com/services/
Sports answered 4/12, 2018 at 6:12 Comment(2)
It's been more than a year & at my previous job, so I cannot test it anytime soon, but thanks anyway for responding.Exasperate
No problem. It might help others :)Sports
R
4

How can I configure Airflow so that any failure in the DAG will (immediately) result in a slack message?

Using airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook you can achieve that, by passing a on_failure_callback function on DAG level.

Bonus ;-) for including a way to pass the name of the failed task to the message.


def fail():
    raise Exception("Task failed intentionally for testing purpose")

def success():
    print("success")

def task_fail_slack_alert(context):
    tis_dagrun = context['ti'].get_dagrun().get_task_instances()
    failed_tasks = []
    for ti in tis_dagrun:
        if ti.state == State.FAILED:
            # Adding log url
            failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>")
    
    dag=context.get('task_instance').dag_id
    exec_date=context.get('execution_date')

    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": ":red_circle: Dag Failed."
            }
        },
        {
            "type": "section",
            "block_id": f"section{uuid.uuid4()}",
            "text": {
                "type": "mrkdwn",
                "text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}"
            },
            "accessory": {
                "type": "image",
                "image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png",
                "alt_text": "Airflow"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"Failed Tasks: {', '.join(failed_tasks)}"
            }
        }
    ]
    failed_alert = SlackWebhookHook(
        http_conn_id='slack-airflow',
        channel="#airflow-notifications",    
        blocks=blocks,
        username='airflow'
    )
    failed_alert.execute()
    return 

default_args = {
    'owner': 'airflow'
}
with DAG(
    dag_id="slack-test",
    default_args=default_args,
    start_date=datetime(2021,8,19),
    schedule_interval=None,
    on_failure_callback=task_fail_slack_alert
) as dag:

    task_1 = PythonOperator(
        task_id="slack_notification_test",
        python_callable=fail
    )

    task_2 = PythonOperator(
        task_id="slack_notification_test2",
        python_callable=success
    )

Slack Message Preview

Roller answered 19/8, 2021 at 13:31 Comment(0)
L
3

The BaseOperator supports 'on_failure_callback' parameter:

on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.

I have not tested this but you should be able to define a function which posts to slack on failure and pass it to each task definition. To get the name of the current task, you can use the {{ task_id }} template.

Laceration answered 26/6, 2017 at 9:29 Comment(2)
I couldn't get the {{ task_id }} to work yet, but your help was appreciatedExasperate
It's also the issue I'm having. I can get variables directly from context var, but I templating such as {{ task_id }} does not workLalita
C
1

I would prefer to add the callback to the DAG and to be inhered by all its tasks:

def on_failure_callback(context):
    webhook_url = os.getenv('SLACK_WEBHOOK_TOKEN')
    slack_data = {
        'text': "@here DAG {} Failed".format(context['dag'].dag_id)
    }

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

dag = DAG(
    dag_id='dag_with_templated_dir',
    start_date=datetime(2020, 1, 1),
    on_failure_callback=on_failure_callback
)
Concelebrate answered 27/7, 2020 at 11:50 Comment(1)
Thanks a lot for this, great idea.Infirmary
D
0

You can send an email to a slack channel. This way, you don't need any additional setup, and you can give an array of emails, one for the real email setup, and one for the slack channel.

'email': ["[email protected]", "[email protected]"]
Davidoff answered 2/7, 2023 at 20:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.