In airflow, on failure, is there a way to repeat a group of tasks?
Asked Answered
C

4

7

In my DAG, I've got a task flow like this:

... >> EmrAddStepsOperator >> EmrStepSensor

A success of EmrAddStepsOperator means, "I was able to tell EMR to start." A failure of EmrStepSensor means, "Something went wrong on the EMR task." I may be a little off about these descriptions, but it's irrelevant to the point I'm trying to make:

If the second task fails, I want to restart the first task, not the second. How do I get airflow to restart the first task when the second task fails?

Chariot answered 14/8, 2019 at 17:16 Comment(3)
I recon there's no straightforward way to achieve this other than writing a custom EmrAddStepsBlockingOperator (which is a fusion of EmrAddStepsOperator and EmrStepSensor). See this. TIP: you can consider employing Python's multiple inheritance for this, but when I tried it, I had run into thisSuellen
@Suellen Hm, that's interesting. Am I thinking in the wrong way when I say I would want something like this? I can't imagine a scenario where the sensor would fail and you wouldn't need the EmrAddStepsOperator to retry.Chariot
A very untidy solution would be to have your DAG push all failed tasks to an external store which serves as input for another DAG. So in effect, everyday a 2nd DAG will pick tasks that failed in 1st DAG and rerun them; the structure of 2nd DAG will be dynamic (it will depend on which / how many tasks failed in the 1st DAG). Read the UPDATE-1 part hereSuellen
L
5

Solution based on on_retry_callback parameter

EDIT: I used this solution for Airflow version 2.0.1. As @obayram states, activate_dag_runs parameter in clear_task_instances is deprecated in version 2.1.1.

You can combine the clear_task_instances function in built-in module airflow.models.taskinstance with the on_retry_callback parameter in operators to retry the last n tasks when the current tasks fails.

You can simply add the following python code into your DAG file:

from airflow.models.taskinstance import clear_task_instances
from airflow.utils.db import provide_session

@provide_session
def retry_upstream_tasks(context, session = None, adr = False):
    task_ids_to_retry = []
    j, a_task = 0, context['task']
    while j < context['params']['retry_upstream_depth']:
        num_upstream_tasks = len(a_task.upstream_task_ids)
        if num_upstream_tasks != 1:
            raise ValueError(f'The # of upstream tasks of "{a_task}" must be 1, but "{num_upstream_tasks}"')
        upstream_task_id = list(a_task.upstream_task_ids)[0]
        task_ids_to_retry.append(upstream_task_id)
        upstream_task = [t for t in context['dag'].tasks if t.task_id == upstream_task_id][0]
        a_task = upstream_task
        j += 1

    all_task_ids_to_instances = {t_ins.task_id: t_ins for t_ins in context['dag_run'].get_task_instances()}
    task_instances_to_retry = [all_task_ids_to_instances[tid] for tid in task_ids_to_retry[::-1]]

    clear_task_instances(tis = task_instances_to_retry, session = session, activate_dag_runs = adr, dag = context['dag'])

task_depends_on_previous_tasks = ANY_OPERATOR( # You can apply this to any operator.
            task_id='task_depends_on_previous_tasks',
            ...
            on_retry_callback=retry_upstream_tasks,
            retries=3,
            params={'retry_upstream_depth': 2} # You can change the depth
        )

Pass {'retry_upstream_depth': n} value to the params parameter of your task operator. You can change n to control how many tasks you want to retry before the current task.

For example

The order of your tasks is as follows:

task_1 >> task_2 >> task_depends_on_previous_tasks

And you want to retry task_1 and task_2 sequentially when task_depends_on_previous_tasks fails.

Then, you should set retry_upstream_depth as 2.

Important Notes

  • In this case, the tasks (except the first/oldest task) which will be retried should have only one upstream task, i.e. these tasks should be on a straight line.

  • The number of retries is limited with retries parameter in the current task. So, if retries=3, the current task can fail up to 3 times and in each retry the previous n tasks are trigerred before the current task is triggered.

Like answered 13/7, 2021 at 7:55 Comment(1)
This solution worked for me. However active_dag_run argument for clear_task_instances function is depreciated starting from Airflow 2.1.1, so I omitted that argument.Dekko
Y
3

On Airflow 1, I used the operator and sensor inside a PythonOperator.

Basically, all the data returned from an operator goes to an xcom. You can get the information from the previous task by calling the execute method from specific Operator and set it in a variable. Run your sensor later with the data you need, if the sensor fails it will generate an airflow exception and PythonOperator will try again as it has been parameterized.

Example:

def python_emr_job(emr):

    job_flow_creator = EmrCreateJobFlowOperator(
        task_id='emr_create_job',
        job_flow_overrides=emr,
        aws_conn_id='...',
        emr_conn_id='...'
    )

    job_flow_id = job_flow_creator.execute(dict())

    job_sensor = EmrJobFlowSensor(
        task_id='emr_job_sensor',
        job_flow_id=job_flow_id,
        aws_conn_id='...'
    )

    job_sensor.execute(dict())

with DAG(...) as dag:

    emr_confg = {...}

    emr_task = PythonOperator(
        task_id='emr_task',
        python_callable=python_emr_job,
        op_kwargs={'emr': emr_confg},
        retries=3
    )
Yate answered 22/6, 2021 at 21:36 Comment(0)
E
1

I guess you could easily put both operators in a subdag operator and set the retry parameters on the subdag operator.

Egomania answered 15/8, 2019 at 11:14 Comment(0)
B
0

In my case, had a problem with creating emr clusters because of 'throttlingException'.

So I just added retry parameter with

EmrCreateJobFlowOperator(... , retries=5, retry_exponential_backoff=True), 

and finally create all emr clusters as I want.

Blackcap answered 2/3, 2023 at 5:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.