Airflow - How to pass xcom variable into Python function
Asked Answered
G

6

68

I need to reference a variable that's returned by a BashOperator. In my task_archive_s3_file, I need to get the filename from get_s3_file. The task simply prints {{ ti.xcom_pull(task_ids=submit_file_to_spark) }} as a string instead of the value.

If I use the bash_command, the value prints correctly.

get_s3_file = PythonOperator(
    task_id='get_s3_file',
    python_callable=obj.func_get_s3_file,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag)

submit_file_to_spark = BashOperator(
    task_id='submit_file_to_spark',
    bash_command="echo 'hello world'",
    trigger_rule="all_done",
    xcom_push=True,
    dag=dag)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
#    bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
    python_callable=obj.func_archive_s3_file,
    params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
    dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file
Gynarchy answered 5/9, 2017 at 15:59 Comment(0)
D
77

Templates like {{ ti.xcom_pull(...) }} can only be used inside of parameters that support templates or they won't be rendered prior to execution. See the template_fields, template_fields_renderers and template_ext attributes of the PythonOperator and BashOperator.

So templates_dict is what you use to pass templates to your python operator:

def func_archive_s3_file(**context):
    archive(context['templates_dict']['s3_path_filename'])

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,  # must pass this because templates_dict gets passed via context
    templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })

However in the case of fetching an XCom value, another alternative is just using the TaskInstance object made available to you via context:

def func_archive_s3_file(**context):
    archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,
Drongo answered 5/9, 2017 at 18:22 Comment(5)
Dumb question but if I use the last example via the xcom_pull, would it re-run said task? I was under the assumpton that xcoms get passed from task to task (in order). In my example, I need the filename that is given from the very first task.Gynarchy
Nope it would not re-run the task. XCom push/pull just adds/retrieves a row from the xcom table in the airflow DB based on DAG id, execution date, task id, and key. Declaring the dependency of submit_file_to_spark >> task_archive_s3_file like you already have should be sufficient to ensure that the filename is pushed into xcom before it is retrieved.Drongo
Too bad you can only pass strings this way. I want to pass objects.Recruitment
@Julio you technically can send pickled objects via XCom, but it will be deprecated in Airflow 2.0 due to security concerns, see related enable_xcom_pickling config for more details.Drongo
@tatlar can u please have a look at similar issue: #67632081Bluma
S
81

Upvoted both the question and the answer, but I think that this can be made a little more clear for those users who just want to pass small data objects between PythonOperator tasks in their DAGs. Referencing this question and this XCom example got me to the following solution. Super simple:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

DAG = DAG(
  dag_id='example_dag',
  start_date=datetime.now(),
  schedule_interval='@once'
)

def push_function(**kwargs):
    ls = ['a', 'b', 'c']
    return ls

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='push_task')
    print(ls)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

I'm not sure why this works, but it does. A few questions for the community:

  • What's happening with ti here? How is that built in to **kwargs?
  • Is provide_context=True necessary for both functions?

Any edits to make this answer clearer are very welcome!

Shrill answered 18/10, 2017 at 21:2 Comment(9)
This works because any task that returns a value is stored in xcom (source). So any return value of PythonOperator is saved to XCom (something to be careful with!). Provide context is required to use the referenced **kwargs, which I usually name that as **context. This context is the same context you get in jinja templates (source). Context provides a lot of useful information specific to a DAG run.Drongo
@DanielHuang One more question: is provide_context necessary for both the push and pull, or only for the push?Shrill
Both! To push or pull, you need access to the TaskInstance object of the current run, which is only available through context.Drongo
Do you have example where ls = ['a', 'b', 'c'] is not hard-coded like it is coming from params/conf?Hydrolysis
"ti" is the task instance object. As returned by the first taskHileman
@Shrill can u please have a look at similar issue #67632081Bluma
BTW in sub_dag have to be added dag_id='master_id" to xcom_pullBarbiturism
Hi. When one calls ti.xcom_pull(task_ids='push_task'), is this the instance of the task recieving the information or the task that output that information? Sorry it is to lame of a questionTropous
Hi @Mangostino, when you invoke xcom_pull this is an instance of the task pulling meta data from from the DAG context layer. We in push_task we’re populating the DAG context with information from push_task internal al state and then pulling those into the internal state of pull_task.Shrill
D
77

Templates like {{ ti.xcom_pull(...) }} can only be used inside of parameters that support templates or they won't be rendered prior to execution. See the template_fields, template_fields_renderers and template_ext attributes of the PythonOperator and BashOperator.

So templates_dict is what you use to pass templates to your python operator:

def func_archive_s3_file(**context):
    archive(context['templates_dict']['s3_path_filename'])

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,  # must pass this because templates_dict gets passed via context
    templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })

However in the case of fetching an XCom value, another alternative is just using the TaskInstance object made available to you via context:

def func_archive_s3_file(**context):
    archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    dag=dag,
    python_callable=obj.func_archive_s3_file,
    provide_context=True,
Drongo answered 5/9, 2017 at 18:22 Comment(5)
Dumb question but if I use the last example via the xcom_pull, would it re-run said task? I was under the assumpton that xcoms get passed from task to task (in order). In my example, I need the filename that is given from the very first task.Gynarchy
Nope it would not re-run the task. XCom push/pull just adds/retrieves a row from the xcom table in the airflow DB based on DAG id, execution date, task id, and key. Declaring the dependency of submit_file_to_spark >> task_archive_s3_file like you already have should be sufficient to ensure that the filename is pushed into xcom before it is retrieved.Drongo
Too bad you can only pass strings this way. I want to pass objects.Recruitment
@Julio you technically can send pickled objects via XCom, but it will be deprecated in Airflow 2.0 due to security concerns, see related enable_xcom_pickling config for more details.Drongo
@tatlar can u please have a look at similar issue: #67632081Bluma
P
19

Used the same code and modified params like Startdate etc.

import airflow
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

DAG = DAG(
  dag_id='simple_xcom',
  default_args=args,
#  start_date=datetime(2019, 04, 21),
  schedule_interval="@daily",
  #schedule_interval=timedelta(1)
)

def push_function(**context):
    msg='the_message'
    print("message to push: '%s'" % msg)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="the_message", value=msg)

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    msg = ti.xcom_pull(task_ids='push_task',key='the_message')
    print("received message: '%s'" % msg)

pull_task = PythonOperator(`enter code here`
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

If you wonder where does the context['task_instance'] and kwargs['ti'] comes from, you can refer to the Airflow macro documentation

Pastiness answered 19/1, 2018 at 4:7 Comment(5)
where do those names ti and 'task_instance' come from?Rate
@LiuWeibo See Airflow macros: airflow.apache.org/code.html#macrosHendrik
Airflow Macro link updated: airflow.apache.org/docs/stable/macros-ref.htmlSummerly
@Kiwy can u please have a look at similar issue #67632081Bluma
ti and task_instance are both the sameVasty
L
5

In Airflow 2.0 (released December 2020), the TaskFlow API has made passing XComs easier. With this API, you can simply return values from functions annotated with @task, and they will be passed as XComs behind the scenes. Example from the tutorial:

    @task()
    def extract():
        ...
        return order_data_dict
    
    @task()
    def transform(order_data_dict: dict):
        ...
        return total_order_value

    order_data = extract()
    order_summary = transform(order_data)

In this example, order_data has type XComArg. It stores the dictionary returned by the extract task. When the transform task runs, order_data is unwrapped, and the task receives the plain Python object that was stored.

Lofty answered 2/1, 2022 at 3:28 Comment(0)
G
3

If you want to pass an xcom to a bash operator in airflow 2 use env; let's say you have pushed to a xcom my_xcom_var, then you can use jinja inside env to pull the xcom value, e.g.

BashOperator(
    task_id=mytask,
    bash_command="echo ${MYVAR}",
    env={"MYVAR": '{{ ti.xcom_pull(key=\'my_xcom_var\') }}'},
    dag=dag
)

Check https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/bash/index.html#module-airflow.operators.bash for more details

Glycogenesis answered 17/1, 2022 at 11:14 Comment(0)
S
1

The Airflow BaseOperator defines a property output that you can use to access the xcom content of the given operator. Here is a concrete example

with DAG(...):
    push_task = PythonOperator(
        task_id='push_task', 
        python_callable=lambda: 'Hello, World!')

    pull_task = PythonOperator(
        task_id='pull_task', 
        python_callable=lambda x: print(x),
        op_args=[push_task.output])

which should be almost equivalent to

with DAG(...):
    push_task = PythonOperator(
        task_id='push_task', 
        python_callable=lambda: 'Hello, World!')

    pull_task = PythonOperator(
        task_id='pull_task', 
        python_callable=lambda x: print(x),
        op_args=["{{ task_instance.xcom_pull('push_task') }}"])

As far as I know, the only difference is that the former implicitly defines push_task >> pull_task.

Scrod answered 17/1, 2023 at 15:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.