I am trying to execute a task after 5 minutes from the parent task inside a DAG.
DAG : Task 1 ----> Wait for 5 minutes ----> Task 2
How can I achieve this in Apache Airflow? Thanks in advance.
I am trying to execute a task after 5 minutes from the parent task inside a DAG.
DAG : Task 1 ----> Wait for 5 minutes ----> Task 2
How can I achieve this in Apache Airflow? Thanks in advance.
The said behaviour can be achieved by introducing a task that forces a delay of specified duration between your Task 1
and Task 2
This can be achieved using PythonOperator
import time
from airflow.operators.python import PythonOperator
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
dag=my_dag,
python_callable=lambda: time.sleep(300))
task_1 >> delay_python_task >> task_2
Or using BashOperator
as well
from airflow.operators.bash import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
dag=my_dag,
bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2
Note: The given code-snippets are NOT tested
References
UPDATE-1
Here are some other ways of introducing delay
on_success_callback
/ on_failure_callback
: Depending of whether Task 2
is supposed to run upon success or failure of Task 1
, you can pass lambda: time.sleep(300)
in either of these params of Task 1
.pre_execute()
/ post_execute()
: Invoking time.sleep(300)
in Task 1
's post_execute()
or Task 2
's pre_execute()
would also have the same effect. Of course this would involve modifying code for your tasks
(1 or 2) so better avoid itPersonally I would prefer the extra task
approach because it makes things more explicit and doesn't falsely exaggerate the runtime of your Task 1
or Task 2
TriggerDagRunOperator
[2] keep triggering your dag frequently and if right time hasn't come yet, then skip execution using either AirflowSkipException
or ShortCircuitOperator
–
Monandrous It's 2023 now, although it's linked in one of the comments, the way to implement this is with TimeDeltaSensor()
or TimeDeltaSensorAsync()
import pendulum
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
@dag(
start_date=pendulum.datetime(2023, 9, 1),
schedule=None,
catchup=False,
tags=["example"],
)
def example_wait():
t10 = TimeDeltaSensor(task_id="wait_some_seconds", delta=pendulum.duration(seconds=10))
t10
See docs
DateTimeSensorAsync(task_id='wait_some_seconds", target_time="""{{ dag_run.logical_date.replace(hour=0, minute=0, second=10) }}""")
–
Casals TimeDeltaSensor
waits specified delta after data_interval_end
github.com/apache/airflow/discussions/35184 –
Chickweed @y2k-shubham gave the best answer to date, however, I want to warn not to use the callback solution. as it first marks the task as success and then executes the callback. which means task2 will not see any delay. if you don't want to use a separate task, you can use something like this:
< ... >
task1 = DummyOperator(task_id='task1', dag=dag)
task1.post_execute = lambda **x: time.sleep(300)
task2 = DummyOperator(task_id'task2', dag=dag)
task1 >> task2
You can return pendulum.now()
from first task to init TimeSensor
:
@task
def task_1():
return pendulum.now()
def task_2():
...
first_task_finished_at = task_1()
TimeSensor(target_time=first_task_finished_at.add(minutes=5)) >> task_2()
TimeDeltaSensor
is not suitable, because it waits specified delta
after context["data_interval_end"]
:
def poke(self, context: Context):
target_dttm = context["data_interval_end"]
target_dttm += self.delta
self.log.info("Checking if the time (%s) has come", target_dttm)
return timezone.utcnow() > target_dttm
© 2022 - 2024 — McMap. All rights reserved.