I recently started using Apache airflow. In am using Taskflow API with one decorated task with id Get_payload
and SimpleHttpOperator
. Task Get_payload
gets data from database, does some data manipulation and returns a dict
as payload.
Probelm
Unable to pass data from previous task into the next task. Yes I am aware of XComs
but whole purpose of using Taskflow API is to avoid direct interactions with XComs
. Getting below error when get_data
is directly passed to data
property of SimpleHttpOperator
.
airflow.exceptions.AirflowException: 400:BAD REQUEST
What have I tried so far?
As mentioned in this SO answer, I used template_field
in my custom sensor to define the field in which to expect the data from the previous task. In case of SimpleHttpOperator
operator I cannot edit it to do the same. So how to solve it similarly in SimpleHttpOperator
?
I have checkd this SO answer and this as well.
DAG:
from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator
from datetime import datetime
default_args = {
"owner": "airflow",
"start_date": datetime(2021, 1, 1),
}
@dag(default_args=default_args, schedule_interval=None, tags=["Http Operators"])
def http_operator():
@task(multiple_outputs=True)
def Get_payload(**kwargs):
# STEP 1: Get data from database.
# STEP 2: Manipulate data.
# STEP 3: Return payload.
data = {
"key_1": "Value 1",
"key_2": "Value 2",
"key_3": "Value 3",
"key_4": "Value 4",
}
return data
get_data = Get_payload()
ml_api = SimpleHttpOperator(
task_id="some_api",
http_conn_id="http_conn_id",
method="POST",
endpoint="/some-path",
data=get_data,
headers={"Content-Type": "application/json"},
)
get_data >> ml_api
http_operator_dag = http_operator()
Full log:
[2021-08-28 20:28:12,947] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:1094} INFO -
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,971] {taskinstance.py:1095} INFO - Starting attempt 1 of 1
[2021-08-28 20:28:12,971] {taskinstance.py:1096} INFO -
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,982] {taskinstance.py:1114} INFO - Executing <Task(SimpleHttpOperator): clf_api> on 2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:12,987] {standard_task_runner.py:52} INFO - Started process 19229 to run task
[2021-08-28 20:28:12,991] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'http_operator', 'clf_api', '2021-08-28T20:28:10.265689+00:00', '--job-id', '71', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/Http_Operator.py', '--cfg-path', '/tmp/tmp4l9hwi4q', '--error-file', '/tmp/tmpk1yrhtki']
[2021-08-28 20:28:12,993] {standard_task_runner.py:77} INFO - Job 71: Subtask clf_api
[2021-08-28 20:28:13,048] {logging_mixin.py:109} INFO - Running <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [running]> on host d332abee08c8
[2021-08-28 20:28:13,126] {taskinstance.py:1251} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=http_operator
AIRFLOW_CTX_TASK_ID=clf_api
AIRFLOW_CTX_EXECUTION_DATE=2021-08-28T20:28:10.265689+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:13,128] {http.py:111} INFO - Calling HTTP method
[2021-08-28 20:28:13,141] {base.py:70} INFO - Using connection to: id: ML_API. Host: <IP-REMOVED>, Port: None, Schema: , Login: dexter, Password: ***, extra: {}
[2021-08-28 20:28:13,144] {http.py:140} INFO - Sending 'POST' to url: http://<IP-REMOVED>/classify
[2021-08-28 20:28:13,841] {http.py:154} ERROR - HTTP error: BAD REQUEST
[2021-08-28 20:28:13,842] {http.py:155} ERROR - <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>400 Bad Request</title>
<h1>Bad Request</h1>
<p>Failed to decode JSON object: Expecting value: line 1 column 1 (char 0)</p>
[2021-08-28 20:28:13,874] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 152, in check_response
response.raise_for_status()
File "/home/airflow/.local/lib/python3.8/site-packages/requests/models.py", line 953, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: BAD REQUEST for url: http://<IP-REMOVED>/classify
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/operators/http.py", line 113, in execute
response = http.run(self.endpoint, self.data, self.headers, self.extra_options)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 141, in run
return self.run_and_check(session, prepped_request, extra_options)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 198, in run_and_check
self.check_response(response)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 156, in check_response
raise AirflowException(str(response.status_code) + ":" + response.reason)
airflow.exceptions.AirflowException: 400:BAD REQUEST
[2021-08-28 20:28:13,882] {taskinstance.py:1505} INFO - Marking task as FAILED. dag_id=http_operator, task_id=clf_api, execution_date=20210828T202810, start_date=20210828T202812, end_date=20210828T202813
[2021-08-28 20:28:13,969] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-08-28 20:28:14,043] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check
SimpleHttpOperator
not rendering correctly, etc.? – ApocarpSimpleHttpOperator
orPythonOperator
to execute the POST request. Is it possible to post what thePythonOperator
version code looks like for a comparison? – Apocarpjson.dumps()
the dict before returning from "Get_payload" task. – Apocarp