How to restart a failed task on Airflow
Asked Answered
D

3

62

I am using a LocalExecutor and my dag has 3 tasks where task(C) is dependant on task(A). Task(B) and task(A) can run in parallel something like below

A-->C

B

So task(A) has failed and but task(B) ran fine. Task(C) is yet to run as task(A) has failed.

My question is how do i re run Task(A) alone so Task(C) runs once Task(A) completes and Airflow UI marks them as success.

Distended answered 7/4, 2017 at 6:8 Comment(0)
D
122

In the UI:

  1. Go to the dag, and dag run of the run you want to change
  2. Click on GraphView
  3. Click on task A
  4. Click "Clear"

This will let task A run again, and if it succeeds, task C should run. This works because when you clear a task's status, the scheduler will treat it as if it hadn't run before for this dag run.

Dell answered 10/4, 2017 at 18:31 Comment(11)
also it is possible to do by using command line: airflow clear -s <start_date> -e <end_date> -t task_a <dag_name>Confide
Thank you so much! Both the UI and the Command line one worked for me!Distended
Is it possible to have this in the code? Which will check after a fixed time if the task failed and try to clear it?Stoll
will this result in a new timestamp for the DAG or will it run with the same execution time?Beecher
@TomasJansson It will use the same execution time as the original one. But your start_date will have the new value (it will have the current time stamp). You could see everything in "Task Instance Details" screen.Bladderwort
@RP-, I noticed that when trying, but thanks for adding the comment here to clarify.Beecher
@TomasJansson The execution date ("Run" in the GUI) will stay the same, however the "Started" and "Ended" Timestamps will referr to the actual time. Thus, this method is useful when it comes to re-running a whole DAG (which has failed) with the original run-date (f.e. if you use the {{ds}} variable within a SQL statement; while just triggering the task manually via the UI will assign the actual timestamp to the {{ds}}-variable and changes the parameters of the task/DAG-run).Coaxial
@Stoll yes it is possible to do this in code using the retries argument in your DAG. It will clear the past result and retry n times, where n is the value you set for retries. retry_delay is how you set the time to wait between tries. You can also set retry_exponential_backoff to programmatically backoff when retrying.Stagey
if you just want to retrigger all the tasks you can also do the same thing from the Grid view by clicking above the tasks the red rectangle and on the right click "Clear existing tasks"Faulk
even better, if you have multiple tasks to rerun, is to click on the Circle with the number of failed tasks, and select all -> actions -> clear stateFaulk
Is it possible to do this through an API call?Fetus
P
5

Here's an alternate solution where you can have it clear and retry certain tasks automatically. If you only want to clear a certain task, you would not use the -d (downstream) flag:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


def clear_upstream_task(context):
    execution_date = context.get("execution_date")
    clear_tasks = BashOperator(
        task_id='clear_tasks',
        bash_command=f'airflow tasks clear -s {execution_date}  -t t1 -d -y clear_upstream_task'
    )
    return clear_tasks.execute(context=context)


# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}


with DAG('clear_upstream_task',
         start_date=datetime(2021, 1, 1),
         max_active_runs=3,
         schedule_interval=timedelta(minutes=5),
         default_args=default_args,
         catchup=False
         ) as dag:
    t0 = DummyOperator(
        task_id='t0'
    )

    t1 = DummyOperator(
        task_id='t1'
    )

    t2 = DummyOperator(
        task_id='t2'
    )
    t3 = BashOperator(
        task_id='t3',
        bash_command='exit 123',
        on_failure_callback=clear_upstream_task
    )

    t0 >> t1 >> t2 >> t3
Prudy answered 19/8, 2021 at 14:15 Comment(0)
F
2

If you want to rerun all the subtasks of the failed tasks, there is an easier way than the accepted answer especially if you have multiple tasks you want to rerun.

You can mark the tasks the same as if they were not done ("Clear the state") and make them queued to be re-run. This is how:

  1. From the homepage click the failed tasks indicator

enter image description here

  1. Click the checkbox to check all the tasks

enter image description here

  1. Click Actions -> Clear the state

enter image description here

Faulk answered 3/8, 2023 at 15:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.