I need to implement the waiting task in Airflow. Waiting time is to be around a couple of hours.
First, TimeDeltaSensor is just not working.
SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
task_id="sleep_for_11_min",
delta=timedelta(minutes=SLEEP_MINUTES_1ST),
)
For daily schedule like:
schedule_interval='30 06 * * *'
Just waits until next schedule:
[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come
That is painfully obvious in code: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43
(not to mention known bug when using schedule: None or @once)
Next try was with TimeSensor like this:
SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeSensor(
task_id="sleep_for_11_min",
provide_context=True,
target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
trigger_rule=TriggerRule.NONE_FAILED
)
And this actually worked well, but in poke mode it takes one worker for the whole time of the wait time. I received a suggestion to use reschedule mode but by just adding:
mode='reschedule',
generates new schedule on every reschedule check and never finishes like this:
[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 15:38:51,306] {time_sensor.py:39} INFO - Checking if the time (14:49:51.079783) has come
[2020-01-15 15:38:51,331] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
...
[2020-01-15 15:41:00,587] {time_sensor.py:39} INFO - Checking if the time (14:52:00.202168) has come
[2020-01-15 15:41:00,614] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
.....
(note that Airflow is mixing UTC and my timezone UTC+1 in the log here)
The next try is to generate target_time for TimeSensor relative to the execution_date of the DAG. But several tries isn't successful like:
task_target_time = '{{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}'
sleep_task_1 = TimeSensor(
task_id=task_id="sleep_for_11_min",
provide_context=True,
# target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
# target_time = task_target_time,
# target_time=datetime.strptime('{{ execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST) }}','%Y-%m-%dT%H:%M:%S'),
# target_time='{{ execution_date }}'+ timedelta(minutes=SLEEP_MINUTES_1ST),
target_time = ('{{ task_instance.execution_date }}' + timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
poke_interval=120,
mode='reschedule',
timeout=10*60*60,
trigger_rule=TriggerRule.NONE_FAILED
)
In commented lines (target_time.... ) you can see just some of the combinations I have tried. Several failed immediately on DAG creation and some generate the error like this during the run:
[2020-01-15 17:56:39,388] {time_sensor.py:39} INFO - Checking if the time ({{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}) has come
[2020-01-15 17:56:39,389] {taskinstance.py:1058} ERROR - '>' not supported between instances of 'datetime.time' and 'str'
Traceback (most recent call last):
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 107, in execute
while not self.poke(context):
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/time_sensor.py", line 40, in poke
return timezone.utcnow().time() > self.target_time
TypeError: '>' not supported between instances of 'datetime.time' and 'str'
[2020-01-15 17:56:39,390] {taskinstance.py:1089} INFO - Marking task as FAILED.
I think I understand the whole theory - the task context including execution_date isn't available on the operator creation, only during runtime. The Jinja returns Pendulum object that should be converted to time, but Jinja is a String and I don't get Pendulum methods at creation time.
But WHY is it so hard to create simple:
sleep 1000
in Airflow.
(Airflow: v1.10.6, python 3.6.8)