In Airflow, I have some issues while running multiple tasks, in this case I have 2 DAGs which 1st DAG has 50 tasks to be done, and 2nd is 5 tasks to be done as follows:
dynamic_Task_Concurrency.py
sources = [
{"num": i, "task_ingest_name": f"table_data{i}", "path_ingestion": "/tmp"}
for i in range(1, 51)
]
# define the DAG
with DAG(
"dynamic_Task_Concurrency",
default_args={
'owner': 'airflow',
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
},
start_date=datetime(2022, 7, 13),
schedule_interval='0 17 * * *',
catchup=False,
tags=['daily'],
max_active_runs=5,
) as dag:
data_ingestion_start = DummyOperator(task_id="data_ingestion_start")
with TaskGroup(group_id="group_data_ingestion") as group_data_ingestion:
for source in sources:
ingest_table = BashOperator(
task_id=f"ingestion_table_{source['task_ingest_name']}",
bash_command="{}/task_delay.sh ".format(source['path_ingestion']),
dag=dag,
)
data_ingestion_end = DummyOperator(task_id="data_ingestion_end")
data_ingestion_start >> group_data_ingestion >> data_ingestion_end
dynamic_Task_Concurrency_two.py
sources = [
{"num": i, "task_ingest_name": f"table_data{i}", "path_ingestion": "/tmp"}
for i in range(1, 5)
]
# define the DAG
with DAG(
"dynamic_Task_Concurrency_two",
default_args={
'owner': 'airflow',
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
},
start_date=datetime(2022, 7, 13),
schedule_interval='0 17 * * *',
catchup=False,
tags=['daily'],
max_active_runs=2,
) as dag:
data_ingestion_start = DummyOperator(task_id="data_ingestion_start")
with TaskGroup(group_id="group_data_ingestion") as group_data_ingestion:
for source in sources:
ingest_table = BashOperator(
task_id=f"ingestion_table_{source['task_ingest_name']}",
bash_command="{}/task_delay.sh ".format(source['path_ingestion']),
dag=dag,
)
data_ingestion_end = DummyOperator(task_id="data_ingestion_end")
data_ingestion_start >> group_data_ingestion >> data_ingestion_end
And in airflow.cfg
I've set up
parallelism = 36
max_active_tasks_per_dag = 12
dags_are_paused_at_creation = True
max_active_runs_per_dag = 5
This is the output that dynamic_Task_Concurrency
executed, while dynamic_Task_Concurrency_two
is on scheduled until at least some tasks of dynamic_Task_Concurrency
are done.
This is when some of dynamic_Task_Concurrency
is Done.
The expectation is dynamic_Task_Concurrency
and dynamic_Task_Concurrency_Two
are running at the same time by running 12 tasks along with.