Apache Airflow: Delay a task for some period of time
Asked Answered
W

4

19

I am trying to execute a task after 5 minutes from the parent task inside a DAG.

DAG : Task 1 ----> Wait for 5 minutes ----> Task 2

How can I achieve this in Apache Airflow? Thanks in advance.

While answered 5/3, 2019 at 11:52 Comment(0)
M
30

The said behaviour can be achieved by introducing a task that forces a delay of specified duration between your Task 1 and Task 2


This can be achieved using PythonOperator

import time
from airflow.operators.python import PythonOperator

delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=my_dag,
                                                   python_callable=lambda: time.sleep(300))

task_1 >> delay_python_task >> task_2

Or using BashOperator as well

from airflow.operators.bash import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
                                             dag=my_dag,
                                             bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2

Note: The given code-snippets are NOT tested


References


UPDATE-1

Here are some other ways of introducing delay

  • UPDATE: do NOT use this as pointed out by @Vit.ai. Original point: on_success_callback / on_failure_callback: Depending of whether Task 2 is supposed to run upon success or failure of Task 1, you can pass lambda: time.sleep(300) in either of these params of Task 1.
  • pre_execute() / post_execute(): Invoking time.sleep(300) in Task 1's post_execute() or Task 2's pre_execute() would also have the same effect. Of course this would involve modifying code for your tasks (1 or 2) so better avoid it

Personally I would prefer the extra task approach because it makes things more explicit and doesn't falsely exaggerate the runtime of your Task 1 or Task 2

Monandrous answered 5/3, 2019 at 13:3 Comment(6)
I tried this, but it keeps the DAG in the running state, So if my wait time is 2 days, there will be a lot of concurrent DAGs that keeps on running, causing the new DAGS to be in the queued state for 2 days. Is there any workaround such that the DAG while sleeping leave the thread.While
@Spandan Singh i can think of 2 possible workarounds [1] have a continuously running DAG that triggers other dags at right time using TriggerDagRunOperator [2] keep triggering your dag frequently and if right time hasn't come yet, then skip execution using either AirflowSkipException or ShortCircuitOperatorMonandrous
The links are outdated now, one of the corresponding links is - github.com/apache/airflow/blob/…Paraphrastic
@Monandrous But when we do time.sleep it halts the current celery worker from picking other task, this way if we have 4 celery worker and 4 task with sleep function and 1 task without sleep, then the task without sleep has no worker left to execute. Correct me if its wrong ? In my opinion better way to wait will be in your current python task, you can check the retry count and if its zero then raise AirflowFailed exception and apply retry delay of 5 minutes, this way the task will be in sleep mode for 5 minutes without halting the current worker.Graticule
@Deepak Tripathi I acknowledge this is a gaping hole in the proposed solution. I haven't been using Airflow for a while now, but do checkout this thread tooMonandrous
it did't work with lambdaKibosh
C
7

It's 2023 now, although it's linked in one of the comments, the way to implement this is with TimeDeltaSensor() or TimeDeltaSensorAsync()

import pendulum
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync

@dag(
    start_date=pendulum.datetime(2023, 9, 1),
    schedule=None,
    catchup=False,
    tags=["example"],
)
def example_wait():
    t10 = TimeDeltaSensor(task_id="wait_some_seconds", delta=pendulum.duration(seconds=10))
    t10
 

See docs

Casals answered 16/9, 2023 at 1:20 Comment(5)
Unfortunately, this does not work with @once or None scheduled DAGs. See: github.com/apache/airflow/issues/9046Tocology
In the case of manual runs, use DateTimeSensorAsync(task_id='wait_some_seconds", target_time="""{{ dag_run.logical_date.replace(hour=0, minute=0, second=10) }}""")Casals
Thanks, Also the TImeSensor task worked just fine.Tocology
There seems to be an error in the documentation. TimeDeltaSensor waits specified delta after data_interval_end github.com/apache/airflow/discussions/35184Chickweed
keep in mind that in airflow, jobs run at the end of the intervalCasals
D
3

@y2k-shubham gave the best answer to date, however, I want to warn not to use the callback solution. as it first marks the task as success and then executes the callback. which means task2 will not see any delay. if you don't want to use a separate task, you can use something like this:

< ... >
task1 = DummyOperator(task_id='task1', dag=dag)
task1.post_execute = lambda **x: time.sleep(300)
task2 = DummyOperator(task_id'task2', dag=dag)

task1 >> task2
Desiccated answered 1/12, 2020 at 18:0 Comment(0)
C
0

You can return pendulum.now() from first task to init TimeSensor:

@task 
def task_1():
  return pendulum.now()

def task_2():
  ...


first_task_finished_at = task_1()
TimeSensor(target_time=first_task_finished_at.add(minutes=5)) >> task_2()

TimeDeltaSensor is not suitable, because it waits specified delta after context["data_interval_end"]:

def poke(self, context: Context):
  target_dttm = context["data_interval_end"]
  target_dttm += self.delta
  self.log.info("Checking if the time (%s) has come", target_dttm)
  return timezone.utcnow() > target_dttm
Chickweed answered 25/10, 2023 at 19:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.