How to parse json string in airflow template
Asked Answered
D

3

9

Is it possible to parse JSON string inside an airflow template?

I have a HttpSensor which monitors a job via a REST API, but the job id is in the response of the upstream task which has xcom_push marked True.

I would like to do something like the following, however, this code gives the error jinja2.exceptions.UndefinedError: 'json' is undefined

t1 = SimpleHttpOperator(
    http_conn_id="s1",
    task_id="job",
    endpoint="some_url",
    method='POST',
    data=json.dumps({ "foo": "bar" }),
    xcom_push=True,
    dag=dag,
)

t2 = HttpSensor(
    http_conn_id="s1",
    task_id="finish_job",
    endpoint="job/{{ json.loads(ti.xcom_pull(\"job\")).jobId }}",
    response_check=lambda response: True if response.json().state == "complete" else False,
    poke_interval=5,
    dag=dag
)

t2.set_upstream(t1)
Doorway answered 27/11, 2017 at 8:0 Comment(0)
V
18

You can add a custom Jinja filter to your DAG with the parameter user_defined_filters to parse the json.

a dictionary of filters that will be exposed in your jinja templates. For example, passing dict(hello=lambda name: 'Hello %s' % name) to this argument allows you to {{ 'world' | hello }} in all jinja templates related to this DAG.

dag = DAG(
    ...
    user_defined_filters={'fromjson': lambda s: json.loads(s)},
)

t1 = SimpleHttpOperator(
    task_id='job',
    xcom_push=True,
    ...
)

t2 = HttpSensor(
    endpoint='job/{{ (ti.xcom_pull("job") | fromjson)["jobId"] }}',
    ...
)

However, it may be cleaner to just write your own custom JsonHttpOperator plugin (or add a flag to SimpleHttpOperator) that parses the JSON before returning so that you can just directly reference {{ti.xcom_pull("job")["jobId"] in the template.

class JsonHttpOperator(SimpleHttpOperator):

    def execute(self, context):
        text = super(JsonHttpOperator, self).execute(context)
        return json.loads(text)
Visitor answered 27/11, 2017 at 21:57 Comment(0)
D
10

Alternatively, it is also possible to add the json module to the template by doing and the json will be available for usage inside the template. However, it is probably a better idea to create a plugin like Daniel said.

dag = DAG(
    'dagname',
    default_args=default_args,
    schedule_interval="@once",
    user_defined_macros={
        'json': json
    }
)

then

finish_job = HttpSensor(
    task_id="finish_job",
    endpoint="kue/job/{{ json.loads(ti.xcom_pull('job'))['jobId'] }}",
    response_check=lambda response: True if response.json()['state'] == "complete" else False,
    poke_interval=5,
    dag=dag
)
Doorway answered 28/11, 2017 at 10:14 Comment(0)
M
2

There is another new method to achieve the above by setting render_template_as_native_obj=True at dag level

dag = DAG(
    'dagname',
    default_args=default_args,
    schedule_interval="@once",
    render_template_as_native_obj=True   # <------
)

t1 = SimpleHttpOperator(
    task_id='job',
    xcom_push=True,
    ...
)

t2 = HttpSensor(
    endpoint='job/{{ (ti.xcom_pull("job")["jobId"] }}',  # <-----
    ...
)

Here's the reference from Airflow docs!

Minutia answered 18/4, 2023 at 18:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.