Event based Triggering and running an airflow task on dropping a file into S3 bucket
Asked Answered
M

2

12

Is it possible to run an airflow task only when a specific event occurs like an event of dropping a file into a specific S3 bucket. Something similar to AWS Lambda events

There is S3KeySensor but I don't know if it does what I want (to run Task only when an event occurs)

Here is the example to make the question more clear:

I have a sensor object as follows

sensor = S3KeySensor(
    task_id='run_on_every_file_drop',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='my-sensor-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag
)

Using the above sensor object, airflow behavior for the sensor task is as follows:

  • Runs the task if there is already an object name matching the wildcard in the S3 bucket my-sensor-bucket even before the DAG is switched ON in airflow admin UI (I don't want to run the task due to the presence of past s3 objects)
  • After running once, the sensor task will not run again whenever there is a new S3 file object drop(I want to run the sensor task and subsequent tasks in the DAG every single time there is a new S3 file object dropped in the bucket my-sensor-bucket)
  • If you configure the scheduler, the tasks are run based on schedule but not based on event. So scheduler seems like not an option in this case

I'm trying to understand if tasks in airflow can be run only based on scheduling(like cron jobs) or sensors(only once based on sensing criteria) or cant it be setup like an event based pipeline(something similar to AWS Lambda)

Meridel answered 4/11, 2019 at 13:52 Comment(4)
did you figure out solution to these ?Terbium
@Terbium not yet. I decided to look into argo workflows. the project looks promising. It's got Argo events which might be a solution for the problem in this question. but the thing is you got to be onboard with kubernetesMeridel
Airflow newbie with the same question here. Would deferrable triggers & operators resolve this? (I don't know if they were added after this question was posted)Hitchcock
The problem with Argo workflows to me is that you would be mixing infrastructure code with operational code. Infrastructure code ideally shouldn't know the function of the pieces, just manage their lifecycle. But you may have a different, still valid, approach to that.Hitchcock
N
4

I should note that this answer was for the 1.x (1.10.x) line of Airflow. With Airflow 2.x they've introduced a concept called triggers that I haven't played as much with as yet. It can help by having a sensor become async based on when a trigger is received to redo the sensor check. I think it's possible to have triggers based on file availability, and the AWS and GCP providers modules have defined some triggers. E.G. for S3 there's these docs.

Original answer follows

Airflow is fundamentally organized around time based scheduling.

You can hack around to get what you want though in a few ways:

  1. Say you have an SQS event on S3 it triggers an AWS Lambda that calls the airflow API to trigger a dag run.
  2. You can make a DAG start with the SQS sensor, when it gets the s3 change event, it just proceeds with the rest of the DAG (see 3_1 & 3_2 for rescheduling).
  3. You can make a DAG start with the sensor (like the one you show) it doesn't choose the task to run, it just passes to the next dependent tasks OR times out. You'd have to delete the key that made the sensor match.
    1. You rerun by making the final task re-trigger the DAG.
    2. Or set the schedule interval to every minute, with no catchup, with max active DAG runs set to 1. This way one run will be active, the sensor will hold it until its time out. If it completes or times out, the next run will start within a minute.

If you go with route 3, you'll be deleting the keys that passed the sensor before the next run of the DAG and its sensor. Note that due to S3 eventual consistency, the routes 1 & 2 are more reliable.

Newhouse answered 7/9, 2020 at 12:20 Comment(2)
Does it mean that Airflow is not meant for handling this kind of event based triggering? I have a use case where I would need to trigger ~1000 instances of the same DAG a day (whenever I receive a new file) and that can last over days. (due to missing dependencies)Eastward
@Eastward I'd say it is not meant for that. If you write a master dag that triggers the other 1000 instances, and you have an S3 event trigger a lambda function which then uses the API to trigger that master dag, you will still see some scheduling lag for the 1000 other runs triggered. That said, if you are on Airflow 2 with Smart Sensors, you might have a better experience just making long running tasks listen for the event. ... and when the DAG completes it retriggers its starting sensor. Kind of not how Airflow was first meant to be used, but possible.Newhouse
C
-2

I know this is an older thread, but there is a pretty clean way to add and control event-based triggers to Airflow using Stonebranch.

Cathe answered 2/3, 2023 at 14:41 Comment(1)
Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.Proceeding

© 2022 - 2024 — McMap. All rights reserved.