How to pause / Un pause multiple dags in airflow
Asked Answered
J

4

8

We have 100 of dags which has a prefix with "dag_EDW_HC_*" . we have below command to pause the dag

Command: airflow pause dag_id

Is there any way we can pause all the 100 dags "dag_EDW_HC_*" in a single go .. (In programmatic in python or any other way) ..?

Jovitta answered 6/8, 2020 at 14:8 Comment(1)
I also referred few of the links like #44360854 but to pause multiple dags i havent got any reference.Jovitta
D
7

The absolute easiest (and likely fastest) way I can think of is to update the database:

UPDATE  dag
   SET  is_paused = false
 WHERE  dag_id LIKE 'dag_EDW_HC%';
Deprecative answered 6/8, 2020 at 15:31 Comment(6)
Thanks a lot for the reply.. So iit means , in airflow database directory inside DAG table , we can make it as false ?Jovitta
what happens if DAGs are currently running and we carry out this update ?Cristincristina
The same thing that would happen if you did it through any other means: new tasks/runs won't be scheduled.Deprecative
Since Feb 2022, Google provides a composer_dags.py script to do this. Check my answer below (https://mcmap.net/q/1335680/-how-to-pause-un-pause-multiple-dags-in-airflow) on how to use itLarder
Hi @Deprecative - I don't have permission to directly update composer database , how can I update dag table programmatically ? - SantanuEdelmiraedelson
@SantanuGhosh if you have terminal access I'd look into xargs and the airflow pause command and do the action there.Deprecative
C
1

The following BashOperators list all DAGs but your could either repeat each or or 'grep' to your liking:

BashOperator(task_id="pause_dags",
    bash_command="airflow dags list -o plain | grep -o '^\S*' | xargs -L1 -t airflow dags pause"
)
BashOperator(task_id="unpause_dags",
    bash_command="airflow dags list -o plain | grep -o '^\S*' | xargs -L1 -t airflow dags unpause"
)
Carrier answered 12/10, 2023 at 8:14 Comment(0)
A
0

If you want to do this regularly you can create a DAG specifically for this purpose with the corresponding PythonOperator for that and specify parameters when triggering DAG. From a running task instance (in the python_callable function that we pass to a PythonOperator or in the execute method of a custom operator) you have access to the DagBag object which contains dag ids of all DAGs loaded into Airflow environment which you can use to get DagModel-s which you can loop through and pause all DAGs:

def python_callable():
   dag_bag = DagBag(read_dags_from_db=False)
   for dag_id_ in dag_bag.dag_ids:
       dag_model = airflow.models.dag.DagModel.get_dagmodel(dag_id_)
       dag_model.set_is_paused(True)

The current code is for the 2.0.1 version and it could differ for different versions of Airflow. You should check the documentation for your version of Airflow server if this callable does not work for you.

Addicted answered 21/4, 2022 at 9:44 Comment(0)
L
0

Since Feb 2022, Google provides a composer_dags.py script which can be used to pause all your DAGs in a given environment, using the below command:

python3 composer_dags.py --environment COMPOSER_1_ENV \
  --project PROJECT_ID \
  --location COMPOSER_1_LOCATION \
  --operation pause

source: https://cloud.google.com/composer/docs/migrate-composer-2-snapshots-af-1#step_pause_dags_in_your_environment

Larder answered 11/10, 2022 at 10:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.