Airflow unpause dag programmatically?
Asked Answered
L

9

17

I have a dag that we'll deploy to multiple different airflow instances and in our airflow.cfg we have dags_are_paused_at_creation = True but for this specific dag we want it to be turned on without having to do so manually by clicking on the UI. Is there a way to do it programmatically?

Liberal answered 5/6, 2017 at 1:1 Comment(0)
K
10

airflow-rest-api-plugin plugin can also be used to programmatically pause tasks.

Pauses a DAG

Available in Airflow Version: 1.7.0 or greater

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause

Query Arguments:

dag_id - string - The id of the dag

subdir (optional) - string - File location or directory from which to look for the dag

Examples:

http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id

See for more details: https://github.com/teamclairvoyant/airflow-rest-api-plugin

Kola answered 1/1, 2018 at 0:31 Comment(0)
L
29

I created the following function to do so if anyone else runs into this issue:

import airflow.settings
from airflow.models import DagModel
def unpause_dag(dag):
    """
    A way to programatically unpause a DAG.
    :param dag: DAG object
    :return: dag.is_paused is now False
    """
    session = airflow.settings.Session()
    try:
        qry = session.query(DagModel).filter(DagModel.dag_id == dag.dag_id)
        d = qry.first()
        d.is_paused = False
        session.commit()
    except:
        session.rollback()
    finally:
        session.close()
Liberal answered 5/6, 2017 at 2:44 Comment(1)
you could also use unpause(args='', dag=dag) method in airflow.bin.cliWinfrid
K
10

airflow-rest-api-plugin plugin can also be used to programmatically pause tasks.

Pauses a DAG

Available in Airflow Version: 1.7.0 or greater

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause

Query Arguments:

dag_id - string - The id of the dag

subdir (optional) - string - File location or directory from which to look for the dag

Examples:

http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id

See for more details: https://github.com/teamclairvoyant/airflow-rest-api-plugin

Kola answered 1/1, 2018 at 0:31 Comment(0)
G
9

supply your dag_id and run this command on your command line.

airflow pause dag_id.

For more information on the airflow command line interface: https://airflow.incubator.apache.org/cli.html

Grunt answered 5/6, 2017 at 8:19 Comment(1)
Awesome, can't believe I missed this. Not exactly programmatically but nonetheless can just use a BashOperator to run that or use the subcommand module to execute it. I also see now here github.com/apache/incubator-airflow/blob/master/airflow/bin/… that I have the same code so yeah I will just use this.Liberal
R
3

I think you are looking for unpause ( not pause)

airflow unpause DAG_ID
Romanfleuve answered 3/5, 2019 at 12:8 Comment(0)
B
2

The following cli command should work per the recent docs.

airflow dags unpause dag_id

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#unpause

Base answered 4/7, 2021 at 14:22 Comment(0)
C
1

Airflow's REST API provides a way using the DAG patch API: we need to update the dag with query parameter ?update_mask=is_paused and send boolean as request body.

Ref: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/patch_dag

Capacious answered 17/12, 2021 at 7:4 Comment(0)
P
1

You can do this using in the python operator of any dag to pause and unpause the dags programatically . This is the best approch i found instead of using cli just pass the list of dags and rest is take care

from airflow.models import DagModel
dag_id = "dag_name"
dag = DagModel.get_dagmodel(dag_id)
dag.set_is_paused(is_paused=False)

And just if you want to check if it is paused or not it will return boolean

dag.is_paused()
Pinite answered 16/9, 2022 at 16:42 Comment(0)
F
0
airflow pause dag_id.

has been discontinued.

You will have to use:

airflow dags pause dag_id
Fransen answered 13/7, 2022 at 7:57 Comment(0)
S
0

a bash approach of srinivasreddyramaram comment could be this one, please notice that you need predefined variables like $DEFAULT_PASSWORD, $ENDPOINT_URL, and Dag id.

The default password belongs to the password used in the Airflow UI, the endpoint URL, is the URL to access your UI, and finally, the dag id is self-explanatory.

A possible approach to implement these rest API calls could be a bootstrap dag, that updates the airflow-providers-amazon library from version 6.0.0 to 6.1.0, because you need to assume some role arn.

restApiResponse=$(curl \
        -s \
        --header "Content-Type: application/json" \
        --request PATCH \
        --user "admin:$DEFAULT_PASSWORD" \
        -w "%{stdout}" \
        --data '{"is_paused": false}' \
        "${ENDPOINT_URL}/api/v1/dags/$DagId?update_mask=is_paused")

echo $restApiResponse
Schaerbeek answered 24/2, 2023 at 14:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.