Apache Airflow - get all parent task_ids
Asked Answered
E

3

7

Suppose a following situation:

[c1, c2, c3] >> child_task

where all c1, c2, c3 and child_task are operators and have task_id equal to id1, id2, id3 and child_id respectively.

Task child_task is also a PythonOperator with provide_context=True and python_callable=dummy_func

def dummy_func(**context):
    #...

Is it possible to retrieve all parents' ids inside the dummy_func(perhaps by browsing the dag somehow using the context)?

Expected result in this case would be a list ['id1', 'id2', 'id3'].

Exchequer answered 16/2, 2019 at 23:0 Comment(0)
J
12

The upstream_task_ids and downstream_task_ids properties of BaseOperator are meant just for this purpose.

from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids

Do note however that with this property, you only get immediate (upstream / downstream) neighbour(s) of a task. In order to get all ancestor or descendent tasks, you can quickly cook-up the good old graph theory approach such as this BFS-like implementation

from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator

def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
    ancestor_task_ids: Set[str] = set()
    tasks_queue: Queue = Queue()
    # determine parent tasks to begin BFS
    for task in my_task.upstream_list:
        tasks_queue.put(item=task)
    # perform BFS
    while not tasks_queue.empty():
        task: BaseOperator = tasks_queue.get()
        ancestor_task_ids.add(element=task.task_id)
        for _task in task.upstream_list:
            tasks_queue.put(item=_task)
    # Convert task_ids to actual tasks
    ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
    return ancestor_tasks

Above snippet is NOT tested, but I'm sure you can take inspiration from it


References

Jansson answered 17/2, 2019 at 0:1 Comment(1)
Immediate neighbours are sufficient in my case. Thanks!Exchequer
O
0

you could do something like the below to get all the task ids from within the dag, this should be helpful for anyone looking for a working answer

def get_all_upstream_task_ids(task):
   upstream_task_ids = set()
   for upstream_task in task.get_direct_relatives(upstream=True):
       upstream_task_ids.add(upstream_task.task_id)
       upstream_task_ids.update(get_all_upstream_task_ids(upstream_task))
   return list(upstream_task_ids)

get_all_upstream_task_ids(context['task'])
Ordinary answered 6/9, 2023 at 4:31 Comment(0)
S
-1

A modern approach with Airflow 2

from airflow.decorators import dag, task

...

@task
def task1():
    return "x" 

@task
def task2():
    return "y"

@task
def print_task_ids(x, y, **context):
    dag_run = context["dag_run"]
    tis = dag_run.get_task_instances()

    for ti in tis:
        print(ti.task_id)


@dag(**dag_args)
def show_task_ids():
    print_task_ids(task1(), task2())
    

dag = show_task_ids()
Sialagogue answered 23/9, 2022 at 20:36 Comment(1)
your answer does not event use x, y parameters that are crucial for getting correct answer.Wryneck

© 2022 - 2024 — McMap. All rights reserved.