Solution based on on_retry_callback
parameter
EDIT: I used this solution for Airflow version 2.0.1. As @obayram states, activate_dag_runs
parameter in clear_task_instances
is deprecated in version 2.1.1.
You can combine the clear_task_instances
function in built-in module airflow.models.taskinstance
with the on_retry_callback
parameter in operators to retry the last n tasks when the current tasks fails.
You can simply add the following python code into your DAG file:
from airflow.models.taskinstance import clear_task_instances
from airflow.utils.db import provide_session
@provide_session
def retry_upstream_tasks(context, session = None, adr = False):
task_ids_to_retry = []
j, a_task = 0, context['task']
while j < context['params']['retry_upstream_depth']:
num_upstream_tasks = len(a_task.upstream_task_ids)
if num_upstream_tasks != 1:
raise ValueError(f'The # of upstream tasks of "{a_task}" must be 1, but "{num_upstream_tasks}"')
upstream_task_id = list(a_task.upstream_task_ids)[0]
task_ids_to_retry.append(upstream_task_id)
upstream_task = [t for t in context['dag'].tasks if t.task_id == upstream_task_id][0]
a_task = upstream_task
j += 1
all_task_ids_to_instances = {t_ins.task_id: t_ins for t_ins in context['dag_run'].get_task_instances()}
task_instances_to_retry = [all_task_ids_to_instances[tid] for tid in task_ids_to_retry[::-1]]
clear_task_instances(tis = task_instances_to_retry, session = session, activate_dag_runs = adr, dag = context['dag'])
task_depends_on_previous_tasks = ANY_OPERATOR( # You can apply this to any operator.
task_id='task_depends_on_previous_tasks',
...
on_retry_callback=retry_upstream_tasks,
retries=3,
params={'retry_upstream_depth': 2} # You can change the depth
)
Pass {'retry_upstream_depth': n}
value to the params
parameter of your task operator. You can change n to control how many tasks you want to retry before the current task.
For example
The order of your tasks is as follows:
task_1 >> task_2 >> task_depends_on_previous_tasks
And you want to retry task_1
and task_2
sequentially when task_depends_on_previous_tasks
fails.
Then, you should set retry_upstream_depth
as 2.
Important Notes
In this case, the tasks (except the first/oldest task) which will be retried should have only one upstream task, i.e. these tasks should be on a straight line.
The number of retries is limited with retries
parameter in the current task. So, if retries=3
, the current task can fail up to 3 times and in each retry the previous n tasks are trigerred before the current task is triggered.
EmrAddStepsBlockingOperator
(which is a fusion ofEmrAddStepsOperator
andEmrStepSensor
). See this. TIP: you can consider employing Python's multiple inheritance for this, but when I tried it, I had run into this – SuellenUPDATE-1
part here – Suellen