I have a dag that we'll deploy to multiple different airflow instances and in our airflow.cfg we have dags_are_paused_at_creation = True
but for this specific dag we want it to be turned on without having to do so manually by clicking on the UI. Is there a way to do it programmatically?
airflow-rest-api-plugin plugin can also be used to programmatically pause tasks.
Pauses a DAG
Available in Airflow Version: 1.7.0 or greater
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause
Query Arguments:
dag_id - string - The id of the dag
subdir (optional) - string - File location or directory from which to look for the dag
Examples:
http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id
See for more details: https://github.com/teamclairvoyant/airflow-rest-api-plugin
I created the following function to do so if anyone else runs into this issue:
import airflow.settings
from airflow.models import DagModel
def unpause_dag(dag):
"""
A way to programatically unpause a DAG.
:param dag: DAG object
:return: dag.is_paused is now False
"""
session = airflow.settings.Session()
try:
qry = session.query(DagModel).filter(DagModel.dag_id == dag.dag_id)
d = qry.first()
d.is_paused = False
session.commit()
except:
session.rollback()
finally:
session.close()
airflow-rest-api-plugin plugin can also be used to programmatically pause tasks.
Pauses a DAG
Available in Airflow Version: 1.7.0 or greater
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause
Query Arguments:
dag_id - string - The id of the dag
subdir (optional) - string - File location or directory from which to look for the dag
Examples:
http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id
See for more details: https://github.com/teamclairvoyant/airflow-rest-api-plugin
supply your dag_id and run this command on your command line.
airflow pause dag_id.
For more information on the airflow command line interface: https://airflow.incubator.apache.org/cli.html
I think you are looking for unpause
( not pause
)
airflow unpause DAG_ID
The following cli command should work per the recent docs.
airflow dags unpause dag_id
https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#unpause
Airflow's REST API provides a way using the DAG patch API: we need to update the dag with query parameter ?update_mask=is_paused
and send boolean as request body.
Ref: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/patch_dag
You can do this using in the python operator of any dag to pause and unpause the dags programatically . This is the best approch i found instead of using cli just pass the list of dags and rest is take care
from airflow.models import DagModel
dag_id = "dag_name"
dag = DagModel.get_dagmodel(dag_id)
dag.set_is_paused(is_paused=False)
And just if you want to check if it is paused or not it will return boolean
dag.is_paused()
airflow pause dag_id.
has been discontinued.
You will have to use:
airflow dags pause dag_id
a bash approach of srinivasreddyramaram comment could be this one, please notice that you need predefined variables like $DEFAULT_PASSWORD
, $ENDPOINT_URL
, and Dag id
.
The default password belongs to the password used in the Airflow UI, the endpoint URL, is the URL to access your UI, and finally, the dag id is self-explanatory.
A possible approach to implement these rest API calls could be a bootstrap dag, that updates the airflow-providers-amazon library from version 6.0.0 to 6.1.0, because you need to assume some role arn.
restApiResponse=$(curl \
-s \
--header "Content-Type: application/json" \
--request PATCH \
--user "admin:$DEFAULT_PASSWORD" \
-w "%{stdout}" \
--data '{"is_paused": false}' \
"${ENDPOINT_URL}/api/v1/dags/$DagId?update_mask=is_paused")
echo $restApiResponse
© 2022 - 2024 — McMap. All rights reserved.