Airflow: How to pass data from a decorated task to SimpleHttpOperator?
Asked Answered
S

1

6

I recently started using Apache airflow. In am using Taskflow API with one decorated task with id Get_payload and SimpleHttpOperator. Task Get_payload gets data from database, does some data manipulation and returns a dict as payload.

Probelm

Unable to pass data from previous task into the next task. Yes I am aware of XComs but whole purpose of using Taskflow API is to avoid direct interactions with XComs. Getting below error when get_data is directly passed to data property of SimpleHttpOperator.

airflow.exceptions.AirflowException: 400:BAD REQUEST

What have I tried so far?

As mentioned in this SO answer, I used template_field in my custom sensor to define the field in which to expect the data from the previous task. In case of SimpleHttpOperator operator I cannot edit it to do the same. So how to solve it similarly in SimpleHttpOperator?

I have checkd this SO answer and this as well.

DAG:

from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator

from datetime import datetime

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
}


@dag(default_args=default_args, schedule_interval=None, tags=["Http Operators"])
def http_operator():
    @task(multiple_outputs=True)
    def Get_payload(**kwargs):
        # STEP 1: Get data from database.

        # STEP 2: Manipulate data.

        # STEP 3: Return payload.
        data = {
            "key_1": "Value 1",
            "key_2": "Value 2",
            "key_3": "Value 3",
            "key_4": "Value 4",
        }

        return data

    get_data = Get_payload()

    ml_api = SimpleHttpOperator(
        task_id="some_api",
        http_conn_id="http_conn_id",
        method="POST",
        endpoint="/some-path",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

    get_data >> ml_api


http_operator_dag = http_operator()

Full log:

[2021-08-28 20:28:12,947] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:1094} INFO - 
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,971] {taskinstance.py:1095} INFO - Starting attempt 1 of 1
[2021-08-28 20:28:12,971] {taskinstance.py:1096} INFO - 
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,982] {taskinstance.py:1114} INFO - Executing <Task(SimpleHttpOperator): clf_api> on 2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:12,987] {standard_task_runner.py:52} INFO - Started process 19229 to run task
[2021-08-28 20:28:12,991] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'http_operator', 'clf_api', '2021-08-28T20:28:10.265689+00:00', '--job-id', '71', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/Http_Operator.py', '--cfg-path', '/tmp/tmp4l9hwi4q', '--error-file', '/tmp/tmpk1yrhtki']
[2021-08-28 20:28:12,993] {standard_task_runner.py:77} INFO - Job 71: Subtask clf_api
[2021-08-28 20:28:13,048] {logging_mixin.py:109} INFO - Running <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [running]> on host d332abee08c8
[2021-08-28 20:28:13,126] {taskinstance.py:1251} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=http_operator
AIRFLOW_CTX_TASK_ID=clf_api
AIRFLOW_CTX_EXECUTION_DATE=2021-08-28T20:28:10.265689+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:13,128] {http.py:111} INFO - Calling HTTP method
[2021-08-28 20:28:13,141] {base.py:70} INFO - Using connection to: id: ML_API. Host: <IP-REMOVED>, Port: None, Schema: , Login: dexter, Password: ***, extra: {}
[2021-08-28 20:28:13,144] {http.py:140} INFO - Sending 'POST' to url: http://<IP-REMOVED>/classify
[2021-08-28 20:28:13,841] {http.py:154} ERROR - HTTP error: BAD REQUEST
[2021-08-28 20:28:13,842] {http.py:155} ERROR - <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>400 Bad Request</title>
<h1>Bad Request</h1>
<p>Failed to decode JSON object: Expecting value: line 1 column 1 (char 0)</p>

[2021-08-28 20:28:13,874] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 152, in check_response
    response.raise_for_status()
  File "/home/airflow/.local/lib/python3.8/site-packages/requests/models.py", line 953, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: BAD REQUEST for url: http://<IP-REMOVED>/classify

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/operators/http.py", line 113, in execute
    response = http.run(self.endpoint, self.data, self.headers, self.extra_options)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 141, in run
    return self.run_and_check(session, prepped_request, extra_options)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 198, in run_and_check
    self.check_response(response)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 156, in check_response
    raise AirflowException(str(response.status_code) + ":" + response.reason)
airflow.exceptions.AirflowException: 400:BAD REQUEST
[2021-08-28 20:28:13,882] {taskinstance.py:1505} INFO - Marking task as FAILED. dag_id=http_operator, task_id=clf_api, execution_date=20210828T202810, start_date=20210828T202812, end_date=20210828T202813
[2021-08-28 20:28:13,969] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-08-28 20:28:14,043] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

Suzetta answered 28/8, 2021 at 18:58 Comment(5)
Can you explain a little more on what you mean by "unable to pass data"? Are you seeing an error, is the value being passed to the SimpleHttpOperator not rendering correctly, etc.?Apocarp
I am getting 400 Bad request error in API response, this happens when payload is missing.Suzetta
Unfortunately I'm not able to reproduce the issue on Airflow 2.1.3. You're code works as written (posting to httpbin.org at least). I don't see any discernible differences between the responses either using a SimpleHttpOperator or PythonOperator to execute the POST request. Is it possible to post what the PythonOperator version code looks like for a comparison?Apocarp
Thanks. I am using version 2.1.3 as well. I have added the full log of the task if it helps. I have removed actual IP address with <IP-REMOVED> for security reasons.Suzetta
Thanks! Does this post help at all? #37523957 You might have to json.dumps() the dict before returning from "Get_payload" task.Apocarp
S
3

As suggested by @Josh Fell in the comments, I had two mistakes in my DAG.

  1. Wrap the data in json.dumps(data) before returning it from Get_payload.
  2. Remove multiple_outputs=True from the task decorator of Get_payload.

Final code:

import json

from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator

from datetime import datetime

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
}


@dag(default_args=default_args, schedule_interval=None, tags=["Http Operators"])
def http_operator():
    @task()
    def Get_payload(**kwargs):
        # STEP 1: Get data from database.

        # STEP 2: Manipulate data.

        # STEP 3: Return payload.
        data = {
            "key_1": "Value 1",
            "key_2": "Value 2",
            "key_3": "Value 3",
            "key_4": "Value 4",
        }

        return json.dumps(data)

    get_data = Get_payload()

    ml_api = SimpleHttpOperator(
        task_id="some_api",
        http_conn_id="http_conn_id",
        method="POST",
        endpoint="/some-path",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

    get_data >> ml_api


http_operator_dag = http_operator()
Suzetta answered 28/8, 2021 at 21:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.