Confused about Airflow's BaseSensorOperator parameters : timeout, poke_interval and mode
Asked Answered
W

2

13

I have a bit of confusion about the way BaseSensorOperator's parameters work: timeout & poke_interval. Consider this usage of the sensor :

BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout after 12 hours
)

The documentation mentions the timeout acts to set the task to 'fail' after it runs out. But I'm using a soft_fail=True, I don't think it retains the same behavior, because I've found the task failed instead of skipping after I've used both parameters soft_fail and timeout.

So what does happen here?

  1. The sensor pokes every 4 hours, and at every poke, will wait for the duration of the timeout (12 hours)?
  2. Or does it poke every 4 hours, for a total of 3 pokes, then times out?
  3. Also, what happens with these parameters if I use the mode="reschedule"?

Here's the documentation of the BaseSensorOperator

class BaseSensorOperator(BaseOperator, SkipMixin):
    """
    Sensor operators are derived from this class and inherit these attributes.
    Sensor operators keep executing at a time interval and succeed when
    a criteria is met and fail if and when they time out.
    :param soft_fail: Set to true to mark the task as SKIPPED on failure
    :type soft_fail: bool
    :param poke_interval: Time in seconds that the job should wait in
        between each tries
    :type poke_interval: int
    :param timeout: Time, in seconds before the task times out and fails.
    :type timeout: int
    :param mode: How the sensor operates.
        Options are: ``{ poke | reschedule }``, default is ``poke``.
        When set to ``poke`` the sensor is taking up a worker slot for its
        whole execution time and sleeps between pokes. Use this mode if the
        expected runtime of the sensor is short or if a short poke interval
        is requried.
        When set to ``reschedule`` the sensor task frees the worker slot when
        the criteria is not yet met and it's rescheduled at a later time. Use
        this mode if the expected time until the criteria is met is. The poke
        inteval should be more than one minute to prevent too much load on
        the scheduler.
    :type mode: str
    """
Waterspout answered 7/9, 2020 at 9:58 Comment(0)
R
26

Defining the terms

  1. poke_interval: the duration b/w successive 'pokes' (evaluation the necessary condition that is being 'sensed')

  2. timeout: Just poking indefinitely is inadmissible (if for e.g. your buggy code is poking on day to become 29 whenever month is 2, it will keep poking for upto 4 years). So we define a maximum period beyond which we stop poking and terminate (the sensor is marked either FAILED or SKIPPED)

  3. soft_fail: Normally (when soft_fail=False), sensor is marked as FAILED after timeout. When soft_fail=True, sensor will instead be marked as SKIPPED after timeout

  4. mode: This is a slightly complex

    • Any task (including sensor) when runs, eats up a slot in some pool (either default pool or explicitly specified pool); essentially meaning that it takes up some resources.
    • For sensors, this is
      • wasteful: as a slot is consumed even when we are just waiting (doing no actual work
      • dangerous: if your workflow has too many sensors that go into sensing around the same time, they can freeze a lot of resources for quite a bit. In fact too many having ExternalTaskSensors is notorious for putting entire workflows (DAGs) into deadlocks
    • To overcome this problem, Airflow v1.10.2 introduced modes in sensors
      • mode='poke' (default) means the existing behaviour that we discussed above
      • mode='reschedule' means after a poke attempt, rather than going to sleep, the sensor will behave as though it failed (in current attempt) and it's status will change from RUNNING to UP_FOR_RETRY. That ways, it will release it's slot, allowing other tasks to progress while it waits for another poke attempt
    • Citing the relevant snippet from code here
    if self.reschedule:
        reschedule_date = timezone.utcnow() + timedelta(
            seconds=self._get_next_poke_interval(started_at, try_number))
        raise AirflowRescheduleException(reschedule_date)
    else:
        sleep(self._get_next_poke_interval(started_at, try_number))
        try_number += 1
    

And now answering your questions directly

Q1

  1. The sensor pokes every 4 hours, and at every poke, will wait for the duration of the timeout (12 hours)?
  2. Or does it poke every 4 hours, for a total of 3 pokes, then times out?

point 2. is correct

Q2

Also, what happens with these parameters if I use the mode="reschedule"?

As explained earlier, each one of those params are independent and setting mode='reschedule' doesn't alter their behaviour in any way

Ridgeway answered 7/9, 2020 at 11:44 Comment(2)
Thank you for your thorough explanation. The way the mode=reschedule parameter works seems indeed like the logical default behavior we want any sensor to have normally... e.g. If a dag runs 30 subdags, each responsible for the ETL of a data file, then 30 sensors poking and waiting seems like a really bad idea.Waterspout
I'd give that to you @Aetos. I think that in early days, they might have not conceived sensors being used so much; and over the years, users and use-cases of Airflow have advanced a great deal. Even so; mode='poke' still presents a strong case (and actually minimizes repetitive scheduling overhead) when poke_interval is small (say ~ 1 min); Astronomer.io guys say this "..'poke' mode: Use this mode if the expected runtime of the sensor is short or if a short poke interval is required.."Ridgeway
G
5
BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout of 12 hours
  mode = "reschedule"
)

Let's say the criteria is not met at the first poke. So it will run again after 4 hours of interval. But the worker slot will be freed during the wait since we're using the mode="reschedule".

That is what I understood.

Gatha answered 12/10, 2020 at 10:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.