Delete a DAG in google composer - Airflow UI
Asked Answered
S

3

5

I want to delete a DAG from the Airflow UI, that's not longer available in the GCS/dags folder. I know that Airflow has a "new" way to remove dags from the DB using airflow delete_dag my_dag_id command, seen in https://mcmap.net/q/217495/-airflow-how-to-delete-a-dag

It seems that in composer airflow version the delete_dag command is not yet supported.

Do not try this: I've also tried using airflow resetdb and the airflow UI died

Is there a way to delete the dags that are not currently in the gs://BUCKET/dags/ folder ?

Slowwitted answered 31/5, 2018 at 20:53 Comment(0)
C
8

I created a DAG to cleanup UI. It reads afDagID from Airflow Variables

from airflow import DAG
from airflow import models
from airflow.operators.mysql_operator import MySqlOperator
import logging
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('ManageAirFlow', description='Deletes Airflow DAGs from backend: Uses vars-  afDagID',
      schedule_interval=None,
      start_date=datetime(2018, 3, 20), catchup=False)

DeleteXComOperator = MySqlOperator(
  task_id='delete-xcom-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from xcom where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteTaskOperator = MySqlOperator(
  task_id='delete-task-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from task_instance where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteSLAMissOperator = MySqlOperator(
  task_id='delete-sla-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from sla_miss where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteLogOperator = MySqlOperator(
  task_id='delete-log-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from log where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteJobOperator = MySqlOperator(
  task_id='delete-job-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from job where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteDagRunOperator = MySqlOperator(
  task_id='delete-dag_run-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from dag_run where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteDagOperator = MySqlOperator(
  task_id='delete-dag-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from dag where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)



DeleteXComOperator >> DeleteTaskOperator >> DeleteSLAMissOperator >> DeleteLogOperator >> DeleteJobOperator >> DeleteDagRunOperator >> DeleteDagOperator
Crosse answered 15/6, 2018 at 15:8 Comment(2)
@Josh is this code still valid? How do I define afDagID? it gives me an error KeyError: 'Variable afDagID does not exist' thanks for the helpPeroration
I am the editor not the answerer! @Crosse - any tips?Adjustment
M
2

As cloud composer is using the latest stable version i.e. 1.9.0, the feature to delete dag is not available.

However,

There are few instructions in the docs to delete a dag as below:

 gcloud beta composer environments storage dags delete \
     --environment ENVIRONMENT_NAME \
     --location LOCATION \
     DAG_NAME.py 

but unfortunately, this would not remove the DAG from the Airflow web interface.

More info: https://cloud.google.com/composer/docs/how-to/using/managing-dags#deleting_a_dag

Mandamandaean answered 31/5, 2018 at 21:13 Comment(4)
do you know how to access to the cloud composer master?Slowwitted
Hey, sorry composer is a managed service hence you won't be able to access master. I have updated my answer as well.Mandamandaean
@PabloA You can access it from "Google Kubernetes Engine", so visit GKE page on google cloud, find the cluster using compiser and click on "Connect"Mandamandaean
doesn't this just go to GCS and delete the file? but the dag's remnants (history, state) still remain in Composer/Airflow?Beisel
W
0

It will be take place in 2 steps:- step1:-

First you have to delete airflow_monitoring.py file with command from storage bucket.

gcloud composer environments storage dags delete --environment viu-etl-prod-composer --location us-central1 airflow_monitoring.py

step 2:-
click on red cross check as shown in picture.

enter image description here

Weasand answered 25/4, 2020 at 8:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.