How to control the parallelism or concurrency of an Airflow installation?
Asked Answered
G

3

69

In some of my Apache Airflow installations, DAGs or tasks that are scheduled to run do not run even when the scheduler doesn't appear to be fully loaded. How can I increase the number of DAGs or tasks that can run concurrently?

Similarly, if my installation is under high load and I want to limit how quickly my Airflow workers pull queued tasks (such as to reduce resource consumption), what can I adjust to reduce the average load?

Gabby answered 30/5, 2019 at 2:0 Comment(0)
G
160

Here's an expanded list of configuration options that are available since Airflow v1.10.2. Some can be set on a per-DAG or per-operator basis, but may also fall back to the setup-wide defaults when they are not specified.


Options that can be specified on a per-DAG basis:

  • concurrency: the number of task instances allowed to run concurrently across all active runs of the DAG this is set on. Defaults to core.dag_concurrency if not set
  • max_active_runs: maximum number of active runs for this DAG. The scheduler will not create new active DAG runs once this limit is hit. Defaults to core.max_active_runs_per_dag if not set

Examples:

# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)

# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)

Options that can be specified on a per-operator basis:

  • pool: the pool to execute the task in. Pools can be used to limit parallelism for only a subset of tasks
  • max_active_tis_per_dag: controls the number of concurrent running task instances across dag_runs per task.

Example:

t1 = BaseOperator(pool='my_custom_pool', max_active_tis_per_dag=12)

Options that are specified across an entire Airflow setup:

  • core.parallelism: maximum number of tasks running across an entire Airflow installation
  • core.dag_concurrency: max number of tasks that can be running per DAG (across multiple DAG runs)
  • core.non_pooled_task_slot_count: number of task slots allocated to tasks not running in a pool
  • core.max_active_runs_per_dag: maximum number of active DAG runs, per DAG
  • scheduler.max_threads: how many threads the scheduler process should use to use to schedule DAGs
  • celery.worker_concurrency: max number of task instances that a worker will process at a time if using CeleryExecutor
  • celery.sync_parallelism: number of processes CeleryExecutor should use to sync task state
Gabby answered 30/5, 2019 at 2:0 Comment(2)
Pool worked for me but task_concurency/max_active_tis_per_dag did not. I am using dynamic tasks, so task concurrency likely applies to each dynamic task since they all have different task ids. Hope this helps someone with dynamic tasks.Taliped
@Taliped there is a new option in v2.6 to handle controlling dynamic tasks: max_active_tis_per_dagrun. See here link. Maybe @hexa can update his answer?Xanthochroid
V
70

An illustration for three major concurrency control variables:

an illustration

From airflow version 2.2, task_concurrency parameter is deprecated by max_active_tis_per_dag.

https://airflow.apache.org/docs/stable/faq.html#how-can-my-airflow-dag-run-faster

Validate answered 18/9, 2020 at 11:35 Comment(1)
The problem with these parameters, is that Sensor operators, until completed, keep DAG in active state, occupying workers. At least in my setup FileSensor and ExternaltaskSensor stay in up_for_reschedule state, and DAG - running, while no real processing is happening. For my process, if current DAGrun fail, airflow run's next, which in my case depend on previous, and ExternaltaskSensor waiting for it to complete. I can not restart current DAG since max_active_runs=1, and next DAGrun already occupies the available worker slot.Billiards
E
2

Check the airflow configuration for which core.executor is used. SequentialExecutor will be executing sequentially, so you can choose Local Executor or Clery Executor which execute the task parallel. After that, you can use other options as mentioned by @hexacyanide

Expectorant answered 17/6, 2019 at 11:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.