如何访问Airflow SimpleHttpOperator GET请求的响应

22

我正在学习Airflow,有一个简单的问题。下面是我的DAG,名为dog_retriever

import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json



default_args = {
    'owner': 'Loftium',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 9),
    'email': 'rachel@loftium.com',
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=3),
}

dag = DAG('dog_retriever',
    schedule_interval='@once',
    default_args=default_args)

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    dag=dag)

t2 = SimpleHttpOperator(
    task_id='get_breeds',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breeds/list',
    headers={"Content-Type": "application/json"},
    dag=dag)
    
t2.set_upstream(t1)

作为测试Airflow的手段,我只是向这个非常简单的http://dog.ceo API中的一些端点发出了两个GET请求。目标是学习如何通过Airflow处理一些检索到的数据。
执行很顺利-我的代码成功地调用了任务t1和t2中的端点,我可以在Airflow UI中看到它们按照我编写的set_upstream规则的正确顺序被记录下来。
我无法弄清楚的是如何访问这两个任务的JSON响应。看起来很简单,但我无法弄清楚。在SimpleHtttpOperator中,我看到一个response_check参数,但没有任何东西可以简单地打印、存储或查看JSON响应。
谢谢。

嗨!你解决了吗,如何从任务t1的响应中访问任务t2中的数据?如果你能分享这个信息就太好了。Chengzhi的答案讲解了如何推送和获取,但如何在任务 t2中拉取呢? - Jacobian
3个回答

26

既然这是SimpleHttpOperator,实际的JSON数据被推送到XCOM,你可以从那里获取。以下是该操作的代码行:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87

您需要做的是设置xcom_push=True ,因此您的第一个t1将如下所示:

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    xcom_push=True,
    dag=dag)

您应该能够在XCOM中找到所有带有返回值的JSON,在此处可以找到有关XCOM的更多详细信息:https://airflow.incubator.apache.org/concepts.html#xcoms


谢谢@Chengzhi,这个可行。虽然我想从现在开始只使用PythonOperator。 - Rachel Lanman
4
@Chengzhi. 你好!请问您能否分享一下第二个SimpleHttpOperator任务t2应该如何编写,以便使用第一个任务的数据。问题是,我看到许多示例都说 - 只需使用xcom推送数据,但它们没有显示接收方部分,或者其他可能使用前一个任务推送的数据的任务。 - Jacobian
1
在Airflow v2中,这不再起作用。 - Galuoises

15
我主要为那些试图(或想要)从进程中调用Airflow工作流DAG并接收任何结果数据的人添加此答案。
重要的是要理解,运行DAG需要进行HTTP POST,并且该POST的响应在Airflow中是硬编码的。换句话说,如果没有更改Airflow代码本身,Airflow永远不会返回除状态代码和消息之外的任何内容给请求进程。
Airflow似乎主要用于创建ETL(抽取、转换、加载)工作流的数据管道,现有的Airflow Operators(例如SimpleHttpOperator)可以从RESTful Web服务获取数据,处理它,并使用其他操作符将其写入数据库,但不会在运行工作流DAG的HTTP POST响应中返回它。
即使这些操作符在响应中返回了数据,查看Airflow源代码也可以确认trigger_dag()方法不会检查或返回它。

apache_airflow_airflow_www_api_experimental_endpoints.py

apache_airflow_airflow_api_client_json_client.py

它所返回的只是这个确认信息:

调度服务中接收到了Airflow DagRun消息

由于Airflow是开源的,我想我们可以修改trigger_dag()方法来返回数据,但这样我们就必须维护分支代码库,并且我们将无法使用基于云的、基于Airflow的服务,如Google Cloud平台上的Cloud Composer,因为它不包括我们的修改。

更糟糕的是,Apache Airflow甚至没有正确地返回其硬编码的状态消息。

当我们成功地向Airflow的/dags/{DAG-ID}/dag_runs端点POST时,我们会收到一个“200 OK”的响应,而不是我们应该收到的“201 Created”响应。而Airflow将响应的Content body硬编码为其“Created…”状态消息。标准是在响应头中返回新创建资源的Uri,而不是在body中返回……这样可以使body自由地返回在创建期间(或由此产生的)任何生成/聚合的数据。

我把这个缺陷归因于“盲目”(或者我称之为“天真”的)敏捷/MVP驱动方法,它只会添加被要求的功能,而不是保持意识并留出更多通用实用空间。由于Airflow主要用于为(和由)数据科学家(而不是软件工程师)创建数据管道,Airflow运算符可以使用其专有的内部XCom特性相互共享数据,如@Chengzhi有益的回答所指出的那样(谢谢!),但在任何情况下都不能将数据返回给启动DAG的请求者,即SimpleHttpOperator可以从第三方RESTful服务检索数据,并且可以通过XCom与PythonOperator共享该数据,以便对其进行丰富、聚合和/或转换。然后,PythonOperator可以将其数据与PostgresOperator共享,后者直接将结果存储在数据库中。但结果永远无法返回请求执行该工作的进程,即我们的编排服务,使Airflow对除了当前用户驱动的用例之外的任何用例都没有用处。
我的收获(至少对我来说)是:1)不要过分赋予任何人或任何组织太多的专业知识。Apache是一个重要的组织,在软件开发中拥有深厚和重要的根基...但他们并不完美。2)始终要注意内部的专有解决方案。开放、基于标准的解决方案已经从许多不同的角度得到了审查和验证,而不仅仅是一个角度。
我花了将近一周的时间寻找做一件看起来非常简单和合理的事情的不同方法。我希望这个答案能为其他人节省一些时间。

那正是我所期望和寻找的!感谢您的澄清! - Meiko Watu

0
I was able to fetch a json from an api , process it and send it to another api .Follwing is my dag

from airflow.models import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.models import Variable

from datetime import datetime
import json

default_args = {
    'start_date': datetime(2020, 1, 1)
}


def _processing_user(ti):
    users_txt = ti.xcom_pull(task_ids=["fetch_user"])[0]
    users = json.loads(users_txt)
    if not len(users) or 'results' not in users:
        raise ValueError("User is empty")
    user = users['results'][0]
    user_map = {
        'firstname':user['name']['first'],
        'lastname':user['name']['last'],
        'name': user['name']['first']+user['name']['last']
        
    }
    processed_user = json.dumps(user_map)
    Variable.set("user", processed_user)
    


with DAG('user_data_processing',
         schedule_interval='@daily',
         default_args=default_args,
         catchup=False) as dag:
         
    is_api_available = HttpSensor(
        task_id='is_api_available',
        http_conn_id='user_api',
        endpoint='api/'
    )

    fetch_user = SimpleHttpOperator(
        task_id='fetch_user',
        http_conn_id='user_api',
        endpoint='api/',
        method='GET'
    )

    processing_user = PythonOperator(
        task_id='processing_user',
        python_callable=_processing_user
    )

    send_response = SimpleHttpOperator(
        task_id="sendresponse",
        http_conn_id="http_conn_id",
        method="POST",
        endpoint="apacheairflowcreatename",
        data="{{ var.json.user }}",
        headers={"Content-Type": "application/json"}
    )
    
    print_user = BashOperator(
        task_id='log_user',
        bash_command='echo "{{ var.value.user }}"',
    )

is_api_available >> fetch_user >> processing_user >> send_response

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接