Airflow: Implementing wait (sleep) task - efficiently
Asked Answered
S

4

4

I need to implement the waiting task in Airflow. Waiting time is to be around a couple of hours.

First, TimeDeltaSensor is just not working.

SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
            task_id="sleep_for_11_min",
            delta=timedelta(minutes=SLEEP_MINUTES_1ST),                    
    )

For daily schedule like:

schedule_interval='30 06 * * *'

Just waits until next schedule:

[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come

That is painfully obvious in code: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43

(not to mention known bug when using schedule: None or @once)

Next try was with TimeSensor like this:

 SLEEP_MINUTES_1ST = 11
 sleep_task_1 = TimeSensor(
           task_id="sleep_for_11_min",
           provide_context=True,
           target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
           trigger_rule=TriggerRule.NONE_FAILED    
        )

And this actually worked well, but in poke mode it takes one worker for the whole time of the wait time. I received a suggestion to use reschedule mode but by just adding:

mode='reschedule',

generates new schedule on every reschedule check and never finishes like this:

[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 15:38:51,306] {time_sensor.py:39} INFO - Checking if the time (14:49:51.079783) has come
[2020-01-15 15:38:51,331] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
...
[2020-01-15 15:41:00,587] {time_sensor.py:39} INFO - Checking if the time (14:52:00.202168) has come
[2020-01-15 15:41:00,614] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
.....

(note that Airflow is mixing UTC and my timezone UTC+1 in the log here)

The next try is to generate target_time for TimeSensor relative to the execution_date of the DAG. But several tries isn't successful like:

task_target_time = '{{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}' 
sleep_task_1 = TimeSensor(
          task_id=task_id="sleep_for_11_min",
          provide_context=True,
#         target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
#         target_time = task_target_time,
#         target_time=datetime.strptime('{{ execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST) }}','%Y-%m-%dT%H:%M:%S'),                        
#         target_time='{{ execution_date }}'+ timedelta(minutes=SLEEP_MINUTES_1ST),
          target_time = ('{{ task_instance.execution_date }}' + timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
              poke_interval=120,
              mode='reschedule',
              timeout=10*60*60,
              trigger_rule=TriggerRule.NONE_FAILED    
        )

In commented lines (target_time.... ) you can see just some of the combinations I have tried. Several failed immediately on DAG creation and some generate the error like this during the run:

[2020-01-15 17:56:39,388] {time_sensor.py:39} INFO - Checking if the time ({{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}) has come
[2020-01-15 17:56:39,389] {taskinstance.py:1058} ERROR - '>' not supported between instances of 'datetime.time' and 'str'
Traceback (most recent call last):
  File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 107, in execute
    while not self.poke(context):
  File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/time_sensor.py", line 40, in poke
    return timezone.utcnow().time() > self.target_time
TypeError: '>' not supported between instances of 'datetime.time' and 'str'
[2020-01-15 17:56:39,390] {taskinstance.py:1089} INFO - Marking task as FAILED.

I think I understand the whole theory - the task context including execution_date isn't available on the operator creation, only during runtime. The Jinja returns Pendulum object that should be converted to time, but Jinja is a String and I don't get Pendulum methods at creation time.

But WHY is it so hard to create simple:

sleep 1000

in Airflow.

(Airflow: v1.10.6, python 3.6.8)

Shakeup answered 15/1, 2020 at 17:59 Comment(0)
D
4

TimeSensor goes into a reschedule loop because target_time is recomputed during each check of the constraint to a different value. This leads to the constraint never being fulfilled.

    target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),

In using TimeSensor this way, you must set target_time to a time value that is the latest time that you expect for a condition to have been satisfied.

I suggest using TimeDeltaSensor in reschedule mode. It is okay to wait for the task to be scheduled, then reschedule it if it fulfils a constraint check or otherwise execute it.

SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
    task_id="sleep_for_11_min",
    delta=timedelta(minutes=SLEEP_MINUTES_1ST),
    mode='reschedule'               
)

You could also subclass BaseSensorOperator similar to TimeSensor that does a liveliness check to see if the task has been released from sleep. For example,

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from airflow.models.taskreschedule import TaskReschedule
from airflow.utils.session import provide_session


XCOM_KEY='start_date'

class ReleaseProbe(BaseSensorOperator):
    """
    Waits until the time of job is released from sleep.
    :param sleep_duration: sleep duration of job before it runs 
    :type delta: datetime.timedelta
    """

    @apply_defaults
    def __init__(self, sleep_duration, *args, **kwargs):
        super(ReleaseProbe, self).__init__(*args, **kwargs)
        self.sleep_duration = sleep_duration

    def poke(self, context):
        self.log.info('Checking if task is released after (%s) sleep, execution date is:  %s', self.sleep_duration)

        ti = context['ti']

        start_date = ti.xcom_pull(key=XCOM_KEY, task_id=ti.task_id)
        if not start_date:
            ti.xcom_push(key=XCOM_KEY, value=timezone.now())
            return False

        return timezone.utcnow() - start_date > self.sleep_duration
Demisec answered 16/1, 2020 at 8:59 Comment(1)
Thanks, I have used your code as inspiration for my own Sensor. The TimeDeltaSensor reschedules itself to another schedule even in reschedule mode so it is practically useless.Shakeup
S
6

Here is Airflow Sensor that is "sleeping" as I presume the TimeDeltaSensor should be sleeping.

It is best to be used in the 'reschedule' mode.

It sleeps relatively to the current time that is the start of the task instance e.g. TimeSleepSensor operator and by default, it "pokes" only once after the sleep duration period and has default timeout that will timeout it soon after requested sleep_duration in case something happened that caused the poke action to fail.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta

class TimeSleepSensor(BaseSensorOperator):
    """
    Waits for specified time interval relative to task instance start

    :param sleep_duration: time after which the job succeeds
    :type sleep_duration: datetime.timedelta
    """

    @apply_defaults
    def __init__(self, sleep_duration, *args, **kwargs):
        super(TimeSleepSensor, self).__init__(*args, **kwargs)
        self.sleep_duration = sleep_duration
        self.poke_interval = kwargs.get('poke_interval',int(sleep_duration.total_seconds()))
        self.timeout = kwargs.get('timeout',int(sleep_duration.total_seconds()) + 30)


    def poke(self, context):
        ti = context["ti"]

        sensor_task_start_date = ti.start_date          
        target_time = sensor_task_start_date + self.sleep_duration

        self.log.info("Checking if the target time ({} - check:{}) has come - time to go: {}, start: {}, initial sleep_duration: {}"
                    .format(target_time, (timezone.utcnow() > target_time), (target_time-timezone.utcnow()), sensor_task_start_date, self.sleep_duration)
        )

        return timezone.utcnow() > target_time

Usage is simple:

    sleep_task = TimeSleepSensor(
                        task_id="sleep_task",
                        sleep_duration=timedelta(minutes=1800),  
                        mode='reschedule'
    )
Shakeup answered 17/1, 2020 at 17:32 Comment(1)
With airflow2 (I'm not sure the exact version they changed the following, I work with 2.3.1), switch ti.start_date with ti.execution_date.Humboldt
D
4

TimeSensor goes into a reschedule loop because target_time is recomputed during each check of the constraint to a different value. This leads to the constraint never being fulfilled.

    target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),

In using TimeSensor this way, you must set target_time to a time value that is the latest time that you expect for a condition to have been satisfied.

I suggest using TimeDeltaSensor in reschedule mode. It is okay to wait for the task to be scheduled, then reschedule it if it fulfils a constraint check or otherwise execute it.

SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
    task_id="sleep_for_11_min",
    delta=timedelta(minutes=SLEEP_MINUTES_1ST),
    mode='reschedule'               
)

You could also subclass BaseSensorOperator similar to TimeSensor that does a liveliness check to see if the task has been released from sleep. For example,

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from airflow.models.taskreschedule import TaskReschedule
from airflow.utils.session import provide_session


XCOM_KEY='start_date'

class ReleaseProbe(BaseSensorOperator):
    """
    Waits until the time of job is released from sleep.
    :param sleep_duration: sleep duration of job before it runs 
    :type delta: datetime.timedelta
    """

    @apply_defaults
    def __init__(self, sleep_duration, *args, **kwargs):
        super(ReleaseProbe, self).__init__(*args, **kwargs)
        self.sleep_duration = sleep_duration

    def poke(self, context):
        self.log.info('Checking if task is released after (%s) sleep, execution date is:  %s', self.sleep_duration)

        ti = context['ti']

        start_date = ti.xcom_pull(key=XCOM_KEY, task_id=ti.task_id)
        if not start_date:
            ti.xcom_push(key=XCOM_KEY, value=timezone.now())
            return False

        return timezone.utcnow() - start_date > self.sleep_duration
Demisec answered 16/1, 2020 at 8:59 Comment(1)
Thanks, I have used your code as inspiration for my own Sensor. The TimeDeltaSensor reschedules itself to another schedule even in reschedule mode so it is practically useless.Shakeup
M
0

Well, it's not exactly a solution to your problem but rather an alternate (tested) way.
What you can do is just created a bash operator and call sleep. It will only take a thread I believe as the sleep command on terminal does.

from airflow.operators.bash_operator import BashOperator
sleep_task = BashOperator(
  task_id='sleep_for_eleven_minutes',
  bash_command='sleep 660',
  dag=dag,
)

This way, your functionality is achieved by the simplest way possible without using any of the complex operators.

Mcavoy answered 16/1, 2020 at 7:55 Comment(1)
Unfortunately, that uses one worker for the whole sleeping time just like TimeSensor in 'poke' mode.Shakeup
P
0

Airflow 2 provides the new taskflow API with a new method to implement sensors.

Here is an example of a DAG containing a single task that ensures at least 11 minutes have passed since the DAG start time. However this won't guarantee the task will succeeds after exactly 11 minutes due to the poke_interval.

import datetime

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.utils import timezone

@dag(
    schedule=None,
    start_date=datetime.datetime(2023, 5, 17),
    catchup=False
)
def example():

    @task.sensor(poke_interval=120, timeout=10*10*60, mode="reschedule")
    def sleep_task(duration: datetime.timedelta):
        context = get_current_context()
        target_time = context["dag_run"]["start_date"] + duration

        return timezone.utcnow() > target_time
    
    sleep_task(datetime.timedelta(minutes=11))
Propitious answered 17/3, 2023 at 14:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.