Airflow Task stuck at scheduled state
Asked Answered
A

1

6

In Airflow, I have some issues while running multiple tasks, in this case I have 2 DAGs which 1st DAG has 50 tasks to be done, and 2nd is 5 tasks to be done as follows:

dynamic_Task_Concurrency.py

sources = [
    {"num": i, "task_ingest_name": f"table_data{i}", "path_ingestion": "/tmp"}
    for i in range(1, 51)
]

# define the DAG
with DAG(
    "dynamic_Task_Concurrency",
    default_args={
        'owner': 'airflow',
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=5)
    },

    start_date=datetime(2022, 7, 13),
    schedule_interval='0 17 * * *',
    catchup=False,
    tags=['daily'],
    max_active_runs=5,
) as dag:

    data_ingestion_start = DummyOperator(task_id="data_ingestion_start")

    with TaskGroup(group_id="group_data_ingestion") as group_data_ingestion:
        for source in sources:
            ingest_table = BashOperator(
                task_id=f"ingestion_table_{source['task_ingest_name']}",
                bash_command="{}/task_delay.sh ".format(source['path_ingestion']),
                dag=dag,
            )

    data_ingestion_end = DummyOperator(task_id="data_ingestion_end")

    data_ingestion_start >> group_data_ingestion >> data_ingestion_end

dynamic_Task_Concurrency_two.py

sources = [
    {"num": i, "task_ingest_name": f"table_data{i}", "path_ingestion": "/tmp"}
    for i in range(1, 5)
]

# define the DAG
with DAG(
    "dynamic_Task_Concurrency_two",
    default_args={
        'owner': 'airflow',
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=5)
    },

    start_date=datetime(2022, 7, 13),
    schedule_interval='0 17 * * *',
    catchup=False,
    tags=['daily'],
    max_active_runs=2,
) as dag:

    data_ingestion_start = DummyOperator(task_id="data_ingestion_start")

    with TaskGroup(group_id="group_data_ingestion") as group_data_ingestion:
        for source in sources:
            ingest_table = BashOperator(
                task_id=f"ingestion_table_{source['task_ingest_name']}",
                bash_command="{}/task_delay.sh ".format(source['path_ingestion']),
                dag=dag,
            )

    data_ingestion_end = DummyOperator(task_id="data_ingestion_end")

    data_ingestion_start >> group_data_ingestion >> data_ingestion_end

And in airflow.cfg I've set up

parallelism = 36
max_active_tasks_per_dag = 12
dags_are_paused_at_creation = True
max_active_runs_per_dag = 5

This is the output that dynamic_Task_Concurrency executed, while dynamic_Task_Concurrency_two is on scheduled until at least some tasks of dynamic_Task_Concurrency are done. enter image description here

This is when some of dynamic_Task_Concurrency is Done. enter image description here

The expectation is dynamic_Task_Concurrency and dynamic_Task_Concurrency_Two are running at the same time by running 12 tasks along with.

Assemblage answered 16/11, 2023 at 11:1 Comment(1)
Which executor are you using? The default local executor doesn't support concurrency if I recall correctlyAddlepated
G
1

In your configuration, you set max_active_runs_per_dag to 5, which means that only 5 instances of each DAG (dynamic_Task_Concurrency and dynamic_Task_Concurrency_two) can run concurrently. This is why you're observing that the second DAG is waiting for the first one to complete before starting its tasks.

To achieve running both DAGs concurrently with 12 tasks each, you need to adjust the configuration and consider the total number of tasks that can run simultaneously across all DAGs.

You can set max_active_runs_per_dag to a higher value like 10 to allow more instances of each DAG to run concurrently. Also, since you want to run 12 tasks simultaneously in each DAG and you have two DAGs, the total concurrency should be at least 24. Finally, you might want to ensure that parallelism and max_active_tasks_per_dag are set to values that accommodate the total concurrency you want to achieve.

You can adjust your airflow.cfg to something like below.

parallelism = 24
max_active_tasks_per_dag = 12
dags_are_paused_at_creation = True
max_active_runs_per_dag = 10
Glossolalia answered 7/4 at 17:33 Comment(2)
Did I misunderstand from the question, didn't he set max_active_runs_per_dag to 12 and parallelism to 36 in the post. Or did he edited so am missing the point.Emmuela
I think the OP edited this post. On the top of this post, right below its title "Airflow Task stuck at scheduled state," it says "Modified today"Glossolalia

© 2022 - 2024 — McMap. All rights reserved.