Airflow: Proper way to run DAG for each file
Asked Answered
R

4

8

I have the following task to solve:

Files are being sent at irregular times through an endpoint and stored locally. I need to trigger a DAG run for each of these files. For each file the same tasks will be performed

Overall the flows looks as follows: For each file, run tasks A->B->C->D

Files are being processed in batch. While this task seemed trivial to me, I have found several ways to do this and I am confused about which one is the "proper" one (if any).

First pattern: Use experimental REST API to trigger dag.

That is, expose a web service which ingests the request and the file, stores it to a folder, and uses the experimental REST api to trigger the DAG, by passing the file_id as conf

Cons: REST apis are still experimental, not sure how Airflow can handle a load test with many requests coming at one point (which shouldn't happen, but, what if it does?)

Second pattern: 2 dags. One senses and triggers with TriggerDagOperator, one processes.

Always using the same ws as described before, but this time it justs stores the file. Then we have:

  • First dag: Uses a FileSensor along with the TriggerDagOperator to trigger N dags given N files
  • Second dag: Task A->B->C

Cons: Need to avoid that the same files are being sent to two different DAG runs. Example:

Files in folder x.json Sensor finds x, triggers DAG (1)

Sensor goes back and scheduled again. If DAG (1) did not process/move the file, the sensor DAG might reschedule a new DAG run with the same file. Which is unwanted.

Third pattern: for file in files, task A->B->C

As seen in this question.

Cons: This could work, however what I dislike is that the UI will probably get messed up because every DAG run will not look the same but it will change with the number of files being processed. Also if there are 1000 files to be processed the run would probably be very difficult to read

Fourth pattern: Use subdags

I am not yet sure how they completely work as I have seen they are not encouraged (at the end), however it should be possible to spawn a subdag for each file and have it running. Similar to this question.

Cons: Seems like subdags can only be used with the sequential executor.


Am I missing something and over-thinking something that should be (in my mind) quite straight-forward? Thanks

Ripply answered 5/2, 2020 at 19:6 Comment(0)
R
1

I found this article: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13

where a new operator, namely TriggerMultiDagRunOperator is used. I think this suits my needs.

Ripply answered 8/2, 2020 at 12:51 Comment(0)
M
6

I know I am late, but I would choose the second pattern: "2 dags. One senses and triggers with TriggerDagOperator, one processes", because:

  • Every file can be executed in parallel
  • The first DAG could pick a file to process, rename it (adding a suffix '_processing' or moving it to a processing folder)
  • If I am a new developer in your company, and I open the workflow, I want to understand what is the logic of workflow doing, rather than which files were processed in the last time was dynamically built
  • If the dag 2, finds an issue with the file, then it renames it (with the '_error' suffix or move it to an error folder)
  • It's a standard way to process files without creating any additional operator
  • it makes de DAG idempotent and easier to test. More info in this article

Renaming and/or moving files is a pretty standard way to process files in every ETL.

By the way, I always recommend this article https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753. It doesn't

Maxi answered 15/5, 2020 at 13:20 Comment(2)
Thanks for your kind answer, even if late it's very appreciated.Ripply
Important to note here is the second bullet: "The first DAG could pick a file to process...". It should pick only one file to process to trigger the second Dag using TriggerDagRunOperator. The extension - TriggerMultiDagRunOperator, overcomes that limitation.Rockbound
A
1

Seems like you should be able to run a batch processor dag with a bash operator to clear the folder, just make sure you set depends_on_past=True on your dag to make sure the folder is successfully cleared before the next time the dag is scheduled.

Assegai answered 6/2, 2020 at 0:18 Comment(0)
R
1

I found this article: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13

where a new operator, namely TriggerMultiDagRunOperator is used. I think this suits my needs.

Ripply answered 8/2, 2020 at 12:51 Comment(0)
B
0

As of Airflow 2.3.0, you can use Dynamic Task Mapping, which was added to support use cases like this:

https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#

Baskin answered 13/9, 2022 at 18:56 Comment(2)
This is the best approach for recent versions. An example is here: https://mcmap.net/q/128093/-proper-way-to-create-dynamic-workflows-in-airflowDrought
This approach sucks. It's limited, by default, to 1024 dynamic tasks for a DAG run. I've raised the limit for my use case, which was about 7000 dynamic tasks, and Airflow became really slow and CPU intensive. Would not recommend this approach for this type of issue.Cedillo

© 2022 - 2024 — McMap. All rights reserved.