Airflow:如何将装饰任务中的数据传递给SimpleHttpOperator?

6

我最近开始使用Apache airflow。我正在使用Taskflow API,其中一个附有id为Get_payloadSimpleHttpOperator的装饰任务。任务Get_payload从数据库获取数据,进行一些数据处理并将dict作为有效负载返回。

问题

无法将先前任务中的数据传递到下一个任务。是的,我知道XComs,但使用Taskflow API的整个目的是避免直接与XComs交互。当将get_data直接传递给SimpleHttpOperatordata属性时,出现以下错误:

airflow.exceptions.AirflowException: 400:BAD REQUEST

我目前尝试了什么?

如在此SO答案中所述,我在自定义传感器中使用template_field来定义从前一个任务中期望数据的字段。对于SimpleHttpOperator操作员,我无法编辑它以执行相同操作。那么如何在SimpleHttpOperator中解决它?

我已经检查过这个SO答案这个

DAG:

from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator

from datetime import datetime

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
}


@dag(default_args=default_args, schedule_interval=None, tags=["Http Operators"])
def http_operator():
    @task(multiple_outputs=True)
    def Get_payload(**kwargs):
        # STEP 1: Get data from database.

        # STEP 2: Manipulate data.

        # STEP 3: Return payload.
        data = {
            "key_1": "Value 1",
            "key_2": "Value 2",
            "key_3": "Value 3",
            "key_4": "Value 4",
        }

        return data

    get_data = Get_payload()

    ml_api = SimpleHttpOperator(
        task_id="some_api",
        http_conn_id="http_conn_id",
        method="POST",
        endpoint="/some-path",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

    get_data >> ml_api


http_operator_dag = http_operator()

完整日志:

[2021-08-28 20:28:12,947] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:1094} INFO - 
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,971] {taskinstance.py:1095} INFO - Starting attempt 1 of 1
[2021-08-28 20:28:12,971] {taskinstance.py:1096} INFO - 
--------------------------------------------------------------------------------
[2021-08-28 20:28:12,982] {taskinstance.py:1114} INFO - Executing <Task(SimpleHttpOperator): clf_api> on 2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:12,987] {standard_task_runner.py:52} INFO - Started process 19229 to run task
[2021-08-28 20:28:12,991] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'http_operator', 'clf_api', '2021-08-28T20:28:10.265689+00:00', '--job-id', '71', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/Http_Operator.py', '--cfg-path', '/tmp/tmp4l9hwi4q', '--error-file', '/tmp/tmpk1yrhtki']
[2021-08-28 20:28:12,993] {standard_task_runner.py:77} INFO - Job 71: Subtask clf_api
[2021-08-28 20:28:13,048] {logging_mixin.py:109} INFO - Running <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [running]> on host d332abee08c8
[2021-08-28 20:28:13,126] {taskinstance.py:1251} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=http_operator
AIRFLOW_CTX_TASK_ID=clf_api
AIRFLOW_CTX_EXECUTION_DATE=2021-08-28T20:28:10.265689+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-28T20:28:10.265689+00:00
[2021-08-28 20:28:13,128] {http.py:111} INFO - Calling HTTP method
[2021-08-28 20:28:13,141] {base.py:70} INFO - Using connection to: id: ML_API. Host: <IP-REMOVED>, Port: None, Schema: , Login: dexter, Password: ***, extra: {}
[2021-08-28 20:28:13,144] {http.py:140} INFO - Sending 'POST' to url: http://<IP-REMOVED>/classify
[2021-08-28 20:28:13,841] {http.py:154} ERROR - HTTP error: BAD REQUEST
[2021-08-28 20:28:13,842] {http.py:155} ERROR - <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>400 Bad Request</title>
<h1>Bad Request</h1>
<p>Failed to decode JSON object: Expecting value: line 1 column 1 (char 0)</p>

[2021-08-28 20:28:13,874] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 152, in check_response
    response.raise_for_status()
  File "/home/airflow/.local/lib/python3.8/site-packages/requests/models.py", line 953, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: BAD REQUEST for url: http://<IP-REMOVED>/classify

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/operators/http.py", line 113, in execute
    response = http.run(self.endpoint, self.data, self.headers, self.extra_options)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 141, in run
    return self.run_and_check(session, prepped_request, extra_options)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 198, in run_and_check
    self.check_response(response)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/http/hooks/http.py", line 156, in check_response
    raise AirflowException(str(response.status_code) + ":" + response.reason)
airflow.exceptions.AirflowException: 400:BAD REQUEST
[2021-08-28 20:28:13,882] {taskinstance.py:1505} INFO - Marking task as FAILED. dag_id=http_operator, task_id=clf_api, execution_date=20210828T202810, start_date=20210828T202812, end_date=20210828T202813
[2021-08-28 20:28:13,969] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-08-28 20:28:14,043] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check


你能详细解释一下“无法传递数据”是什么意思吗?你是否看到了错误,还是传递给SimpleHttpOperator的值没有正确渲染等问题? - Josh Fell
API响应中出现了400 Bad Request错误,这通常是由于缺少有效载荷引起的。 - Dheemanth Bhat
1
很遗憾,我无法在Airflow 2.1.3上重现此问题。您的代码按原样工作(至少在httpbin.org上发布)。我没有看到使用SimpleHttpOperatorPythonOperator执行POST请求时响应之间的任何明显差异。是否可能发布一下PythonOperator版本的代码以进行比较? - Josh Fell
谢谢。我也使用2.1.3版本。如果可以的话,我已经添加了任务的完整日志。出于安全原因,我已经用<IP-REMOVED>替换了实际IP地址。 - Dheemanth Bhat
2
谢谢!这篇文章有帮助吗?https://dev59.com/SloU5IYBdhLWcg3wYWOx在“Get_payload”任务返回之前,您可能需要对字典进行json.dumps()处理。 - Josh Fell
1个回答

3

正如评论中@Josh Fell所建议的那样,我的DAG中有两个错误。

  1. 在从Get_payload返回data之前,将data包裹在json.dumps(data)中。
  2. Get_payload任务装饰器中移除multiple_outputs=True

最终代码:

import json

from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator

from datetime import datetime

default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
}


@dag(default_args=default_args, schedule_interval=None, tags=["Http Operators"])
def http_operator():
    @task()
    def Get_payload(**kwargs):
        # STEP 1: Get data from database.

        # STEP 2: Manipulate data.

        # STEP 3: Return payload.
        data = {
            "key_1": "Value 1",
            "key_2": "Value 2",
            "key_3": "Value 3",
            "key_4": "Value 4",
        }

        return json.dumps(data)

    get_data = Get_payload()

    ml_api = SimpleHttpOperator(
        task_id="some_api",
        http_conn_id="http_conn_id",
        method="POST",
        endpoint="/some-path",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

    get_data >> ml_api


http_operator_dag = http_operator()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接