How to mark an Airflow DAG run as failed if any task fails?
Asked Answered
E

2

27

Is it possible to make an Airflow DAG fail if any task fails?

I usually have some cleaning up tasks at the end of a DAG and as it is now, whenever the last task succeeds the whole DAG is marked as a success.

Endodermis answered 27/4, 2018 at 5:5 Comment(3)
Might be connected to issues.apache.org/jira/browse/…Cynic
This might be related but it is not the same, in my case the task is executed but the whole dag is marked as success instead of failedEndodermis
I have the same problem. I would argue this to be a bug.Ruisdael
L
26

Another solution can be to add a final PythonOperator that checks the status of all tasks in this run:

final_status = PythonOperator(
    task_id='final_status',
    provide_context=True,
    python_callable=final_status,
    trigger_rule=TriggerRule.ALL_DONE, # Ensures this task runs even if upstream fails
    dag=dag,
)

def final_status(**kwargs):
    for task_instance in kwargs['dag_run'].get_task_instances():
        if task_instance.current_state() != State.SUCCESS and \
                task_instance.task_id != kwargs['task_instance'].task_id:
            raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))
Lashanda answered 26/11, 2019 at 17:40 Comment(2)
I prefer this solution as I can simply write this operator once and simply put is at the end of any DAG without changing any operator before.Endodermis
Is it possible to get and print log and trace of the failed task ?Alishiaalisia
R
4

Facing a similar problem. It is not a bug but it could be a nice feature to add this property to Dag.

As a workaround, I you can push a XCOM variable during the task that is allowed to fail and in the downstream tasks do something like

if ti.xcom_pull(key='state', task_ids=task_allowed_to_fail_id) == 'FAILED': raise ValueError('Force failure because upstream task has failed')

Revenant answered 13/6, 2018 at 11:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.