Retry Airflow task instance only on certain Exception
Asked Answered
U

2

9

What's the best way to retry an Airflow operator only for certain failures/exceptions?

For example, let's assume that I have an Airflow task which relies on the availability of an external service. If this service becomes unavailable during the task execution, I would like to retry later (max 3 retries). For other failures I do not want to retry.

My current approach is to use the on_failure_callback and manipulate context["ti"].task.retries on the desired exception by parsing context["exception"], but I consider this as messy and hard to understand. Are there better options?

Unpin answered 26/6, 2019 at 13:59 Comment(3)
You can have a sensor that checks the availability and add the airflow task downstream. By correctly configuring poke_interval and timeout, you can achieve the exact retries you wantFactitious
Yes, I can check before if the service is available. However, my task takes a while to execute, so it's possible that the service goes down only during the task execution.Unpin
I asked a similar question (maybe even duplicate) yesterday and I found a way. https://mcmap.net/q/1319132/-dynamically-change-the-number-of-task-retriesEdwinedwina
D
2

Most of airflow's operators use a Hook class to complete the work.

If you can create your own PythonOperator and try/catch the exceptions you want to avoid and throw the exceptions you want to trigger the retry it will comply with airflow architecture seamlessly:

# python operator function
def my_operation():
    try:
        hook = SomeHook()
        hook.use_it()
    except IgnorableException as e:
        pass


# then:
my_operator = PythonOperator(
    task_id='my-operator',
    python_callable=my_operation
)

It gives you more control over your Operator and DAG life-cycle.

Danley answered 22/12, 2019 at 17:11 Comment(0)
L
2

I know this is almost exactly what you specified you don't want to do - so apologies if it's no use (perhaps could be useful for someone else). It does differ very slightly from your requirement as it sets retries to a number initially and then checks whether to honour that retry number based on the contents of exceptions - rather than setting retries dynamically as an error message is encountered. i.e. it uses on_retry_callback instead of on_failure_callback.

You could set retries to the number you would like, but then use an on_retry_callback to alter the task State. Here's an example that always generates exceptions, but manipulates the task State based on the exception name:

from airflow.decorators import dag,task
from airflow.utils.state import State
from datetime import timedelta, datetime
import random

@dag(
dag_id="retry_testing"
,tags=['utils','experimental']
,schedule_interval=None
,start_date=datetime(2020,1,1)
,description="Testing the on_retry_callback parameter"
,params={"key":"value"}
,render_template_as_native_obj=True
)
def taskflow():

    def exception_parser(context):
        """
        A function that checks the class name of the Exception thrown.
        
        Different exceptions trigger behaviour of allowing the task to fail, retry or succeed
        """
        print('retrying...')
        ti = context["task_instance"]
        exception_raised = context.get('exception')
        if exception_raised.__class__.__name__ == 'ZeroDivisionError':
            print("div/0 error, setting task to failed")
            ti.set_state(State.FAILED)
        elif exception_raised.__class__.__name__ == 'TypeError':
            print("Type Error - setting task to success")
            ti.set_state(State.SUCCESS)
        else:
            print("Not div/0 error, trying again...")

    @task(
        retries=10,
        retry_delay=timedelta(seconds=3),
        on_retry_callback=exception_parser,
    )
    def random_error():
        """Does some common runtime errors based on the value of a random number."""
        r = random.randrange(0,10)
        print(f"random integer = {r}")
        if r in [0,1,2]:
            # Produce a ZeroDivisionError
            x = 1/0
            print(x)
        elif r in [3,4,5]:
            # Produce a TypeError
            x = 'not a number'+1
        else:
            # Produce a KeyError
            mydict = {"thiskey":"foo"}
            get_missing_key = mydict["thatkey"]
            print(get_missing_key)

    random_error()
        
taskflow()

This has been tested on MWAA v2.5.1

Lendlease answered 3/7, 2023 at 3:49 Comment(1)
Note that if you do this and if you also have a on_failure_callback, your failure callback will not be called the non-retryable error occurs on the first time (or any time before the last retry attempt). I encountered that bug just now when I tried adopting this solution. Instead of ti.set_state(State.FAILED) I used ti.handle_failure(exception_raised, context=context, force_fail=True), which does call the on_failure_callback if one is provided.Sketchy

© 2022 - 2024 — McMap. All rights reserved.