Airflow taskflow - run task in parallele
Asked Answered
S

1

6

Wanted to try the new taskflow API I came to the point where I need to have 2 parallels task.

With Airflow v1 I was use to do something like

task_1 >> [task_2, task_3]
[task_2, task_3] >> task_4

The way we call the task is different now for PythonOperator

How can I do the list with TaskFlow ?

Thanks

Semifinal answered 24/4, 2021 at 7:19 Comment(0)
C
8

if each task is depended on the value from previous task you can achieve it by:

from airflow.utils.dates import days_ago
from airflow.decorators import task, dag


@task
def task_1():
    return 'first task'

@task
def task_2(value):
    return 'second task'

@task
def task_3(value):
    return 'third task'

@task
def task_4(value1, value2):
    return 'forth task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow_stackoverflow', schedule_interval='@once', default_args=default_args, catchup=False)
def my_dag():
    op_1 = task_1()
    op_2 = task_2(op_1)
    op_3 = task_3(op_1)
    op_4 = task_4(op_2, op_3)

dag = my_dag()

enter image description here

The syntax that you mentioned is also supported but you won't get direct access to the xcom values from previous tasks:

@task
def task_1():
    return 'first task'

@task
def task_2():
    return 'second task'

@task
def task_3():
    return 'third task'

@task
def task_4():
    return 'forth task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow_stackoverflow', schedule_interval='@once', default_args=default_args, catchup=False)
def my_dag():

    op_1 = task_1()
    op_2 = task_2()
    op_3 = task_3()
    op_4 = task_4()

    op_1 >> [op_2, op_3]
    [op_2, op_3] >> op_4

dag = my_dag()

Probably you need to mix the two options of syntax depending on what you want to achieve.

Contradictory answered 27/4, 2021 at 11:37 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.