Sharing large intermediate state between Airflow tasks
Asked Answered
S

2

15

We have an Airflow deployment with Celery executors.

Many of our DAGs require a local processing step of some file in a BashOperator or PythonOperator.

However, in our understanding the tasks of a given DAG may not always be scheduled on the same machine.

The options for state sharing between tasks I've gathered so far:

  1. Use Local Executors - this may suffice for one team, depending on the load, but may not scale to the wider company

  2. Use XCom - does this have a size limit? Probably unsuitable for large files

  3. Write custom Operators for every combination of tasks that need local processing in between. This approach reduces modularity of tasks and requires replicating existing operators' code.

  4. Use Celery queues to route DAGs to the same worker (docs) - This option seems attractive at first, but what would be an appropriate way to set it up in order to avoid routing everything to one executor, or crafting a million queues?

  5. Use a shared network storage in all machines that run executors - Seems like an additional infrastructure burden, but is a possibility.

What is the recommended way to do sharing of large intermediate state, such as files, between tasks in Airflow?

Semiramis answered 12/2, 2018 at 21:40 Comment(1)
If you have a large dataset needed to be shared, maybe consider a data lake like S3. Airflow is not a data streaming solution. Tasks do not move data from one to the other (though tasks can exchange metadata!) based on docs: airflow.apache.orgOof
L
7

To clarify something: No matter how you setup airflow, there will only be one executor running.

  • The executor runs on the same machine as the scheduler.
  • Currently (current is airflow 1.9.0 at time of writing) there is no safe way to run multiple schedulers, so there will only ever be one executor running.
  • Local executor executes the task on the same machine as the scheduler.
  • Celery Executor just puts tasks in a queue to be worked on the celery workers.

However, the question you are asking does apply to Celery workers. If you use Celery Executor you will probably have multiple celery workers.

Using network shared storage solves multiple problems:

  • Each worker machine sees the same dags because they have the same dags folder
  • Results of operators can be stored on a shared file system
  • The scheduler and webserver can also share the dags folder and run on different machines

I would use network storage, and write the output file name to xcom. Then when you need to input the output from a previous task, you would read the file name from that task's Xcom and process that file.

Lorettelorgnette answered 14/2, 2018 at 15:44 Comment(1)
How would you approach this if the you had to execute a python script stored in s3? If I have 2 tasks, where 1 task downloads the script to local and the next one executes it, it might fail with a file not found error since the downloaded task and execute task might run on different machines, right?Erl
G
0

Change datatype of column key in xcom table of airflow metastore. Default datatype of key is: blob. Change it to LONGBLOB. It will help you to store upto 4GB in between intermediate tasks.

Gains answered 20/1, 2023 at 14:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.