Dynamic dags not getting added by scheduler
Asked Answered
S

1

0

I am trying to create Dynamic DAGs and then get them to the scheduler. I tried the reference from https://www.astronomer.io/guides/dynamically-generating-dags/ which works well. I changed it a bit as in the below code. Need help in debugging the issue.

I tried 1. Test run the file. The Dag gets executed and the globals() is printing all the DAGs objects. But somehow not listing in the list_dags or in the UI

from datetime import datetime, timedelta
import requests
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator

def create_dag(dag_id,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval="@hourly",
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)

    return dag


def fetch_new_dags(**kwargs):

    for n in range(1, 10):
        print("=====================START=========\n")
        dag_id = "abcd_" + str(n) 
        print (dag_id)
        print("\n")
        globals()[dag_id] = create_dag(dag_id, n, default_args)
        print(globals())

default_args = {
    'owner': 'diablo_admin',
    'depends_on_past': False,
    'start_date': datetime(2019, 8, 8),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'trigger_rule': 'none_skipped'
    #'schedule_interval': '0 * * * *'
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('testDynDags', default_args=default_args, schedule_interval='*/1 * * * *')
#schedule_interval='*/1 * * * *'

check_for_dags = PythonOperator(dag=dag,
                   task_id='tst_dyn_dag',
                   provide_context=True,
                   python_callable=fetch_new_dags
                   )




check_for_dags

Expected to create 10 DAGs dynamically and added to the scheduler.

Sum answered 12/8, 2019 at 13:58 Comment(0)
P
1

I guess doing the following would fix it

  • completely remove the global testDynDags dag and tst_dyn_dags task (instantiation and invocation)
  • invoke your fetch_new_dags(..) method with requisite arguments in global scope

Explanation

  • Dynamic dags / tasks merely means that you have a well-defined logic at the time of writing dag-definition file that can help create tasks / dags having a known structure in a pre-defined fashion.
  • You can NOT determine the structure of your DAG at runtime (task execution). So, for instance, you cannot add n identical tasks to your DAG if the upstream task returned an integer value n. But you can iterate over a YAML file containing n segments and generate n tasks / dags.

So clearly, wrapping dag-generation code inside an Airflow task itself makes no sense.


UPDATE-1

From what is indicated in comments, I infer that the requirement dictates that you revise your external source that feeds inputs (how many dags or tasks to create) to your DAG / task-generation script. While this is indeed a complex use-case, but a simple way to achieve this is to create 2 separate DAGs.

  • One dag runs once in a while and generates the inputs that are stored in an an external resource like Airflow Variable (or any other external store like file / S3 / database etc.)
  • The second DAG is constructed programmatically by reading that same datasource which was written by the first DAG

You can take inspiration from the Adding DAGs based on Variable value section

Panther answered 12/8, 2019 at 16:59 Comment(3)
Thank you Shubham. My intention is to create new dags with same structure and the inputs for the tasks in each DAG varies including the schedule. Hence, the logic and I need the testDynDags to be running at regular frequency. If I remove testDynDags and tst_dyn_dags task. How can I keep invoking the fetch_new_dags method from external sources (since I could not find an API to perform)? Currently, in the tst_dyn_dags task, I am actually calling a rest api and checking for new dag_ids that are to be created along with required arguments.Sum
Thank you again Shubham. I achieved this using One DAG (which invokes the python program) and one python program that just creates DAGs .Sum
@Sum can you guide me on how you achieve this?Expressway

© 2022 - 2024 — McMap. All rights reserved.