Airflow SimpleHttpOperator 支持 HTTPS

7

我正在尝试使用SimpleHttpOperator来消费RESTful API。但是,正如其名称所示,它仅支持HTTP协议,而我需要消费HTTPS URI。因此,现在我必须使用Python的“requests”对象或从应用程序代码中处理调用。但这可能不是标准的方法。因此,我正在寻找其他可用选项,以从Airflow内部消费HTTPS URI。谢谢。


你尝试过在连接中使用https吗?我没有看到必须是http的要求。但由于没有提供版本,我只能根据当前代码库进行检查。 - tobi6
感谢更新。我正在使用Airflow 1.8.0版本。连接组合框只显示HTTP选项。当对HTTP连接进行HTTPS请求时,返回未经授权的401错误。 - Kris
你拉取的服务器是否未经授权?你尝试在服务器上使用curl了吗? - tobi6
@Kris,你找到解决方案了吗? - melchoir55
我已经为这个需求编写了自己的定制解决方案。SimpleHTTPOperator 看起来还不错。但是,在创建连接时选择协议的下拉列表只允许选择 HTTP。因此,最终 URI 将发送一个 HTTP 请求而不是 HTTPS。 - Kris
显示剩余3条评论
7个回答

7
我深入研究了此问题并确认这是Airflow中的一个bug。我已经在这里为它创建了一个票据: https://issues.apache.org/jira/browse/AIRFLOW-2910 目前,最好的做法是覆盖SimpleHttpOperator和HttpHook,以改变HttpHook.get_conn的工作方式(接受https)。我可能会这样做,如果我这样做了,我会发布一些代码。
更新: 操作员覆盖:
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.exceptions import AirflowException
from operators.https_support.https_hook import HttpsHook


class HttpsOperator(SimpleHttpOperator):
    def execute(self, context):
        http = HttpsHook(self.method, http_conn_id=self.http_conn_id)

        self.log.info("Calling HTTP method")

        response = http.run(self.endpoint,
                            self.data,
                            self.headers,
                            self.extra_options)
        if self.response_check:
            if not self.response_check(response):
                raise AirflowException("Response check returned False.")
        if self.xcom_push_flag:
            return response.text

钩子覆盖

from airflow.hooks.http_hook import HttpHook
import requests


class HttpsHook(HttpHook):
    def get_conn(self, headers):
        """
        Returns http session for use with requests. Supports https.
        """
        conn = self.get_connection(self.http_conn_id)
        session = requests.Session()

        if "://" in conn.host:
            self.base_url = conn.host
        elif conn.schema:
            self.base_url = conn.schema + "://" + conn.host
        elif conn.conn_type:  # https support
            self.base_url = conn.conn_type + "://" + conn.host
        else:
            # schema defaults to HTTP
            self.base_url = "http://" + conn.host

        if conn.port:
            self.base_url = self.base_url + ":" + str(conn.port) + "/"
        if conn.login:
            session.auth = (conn.login, conn.password)
        if headers:
            session.headers.update(headers)

        return session

使用方法:

作为 SimpleHttpOperator 的替代品,可以直接使用。


当HTTP API返回200以外的响应代码时,如何在此处检查响应代码。 - dks551

5
这篇文章已经有几个月了,但是就我而言,使用Airflow 1.10.2进行HTTPS调用没有任何问题。
在我的最初测试中,我正在请求来自Sendgrid的模板,因此连接设置如下:
Conn Id   : sendgrid_templates_test
Conn Type : HTTP   
Host      :   https://api.sendgrid.com/
Extra     : { "authorization": "Bearer [my token]"}

然后在dag代码中:

get_templates = SimpleHttpOperator(
        task_id='get_templates',
        method='GET',
        endpoint='/v3/templates',
        http_conn_id = 'sendgrid_templates_test',
        trigger_rule="all_done",
        xcom_push=True
        dag=dag,
    )

我试过了,它起作用了。同时请注意,我的请求发生在分支操作之后,所以我需要适当设置触发规则(设置为“all_done”,以确保即使跳过其中一个分支也会触发),这与问题无关,但我想指出来。

现在要明确的是,由于我没有启用证书验证,因此我收到了不安全请求警告。但您可以在下面看到结果日志。

[2019-02-21 16:15:01,333] {http_operator.py:89} INFO - Calling HTTP method
[2019-02-21 16:15:01,336] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,335] {base_hook.py:83} INFO - Using connection to: id: sendgrid_templates_test. Host:  https://api.sendgrid.com/, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: {'authorization': 'Bearer [my token]'}
[2019-02-21 16:15:01,338] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,337] {http_hook.py:126} INFO - Sending 'GET' to url:  https://api.sendgrid.com//v3/templates
[2019-02-21 16:15:01,956] {logging_mixin.py:95} WARNING - /home/csconnell/.pyenv/versions/airflow/lib/python3.6/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
[2019-02-21 16:15:05,242] {logging_mixin.py:95} INFO - [2019-02-21 16:15:05,241] {jobs.py:2527} INFO - Task exited with return code 0

谢谢您的回复,我确认 https 是可以这样工作的,但是您能否在不覆盖 HttpHook 的情况下将头信息放入 Extra 连接字段中?对我来说,这种方法并不起作用。 - Nicolas
在动态令牌的情况下,返回仅翻译的文本内容。 - Tiago Medici
在动态令牌的情况下,返回仅翻译的文本内容。 - undefined

4
我遇到了HTTP/HTTPS连接的问题,当我尝试使用环境变量进行连接设置时(虽然在UI上设置连接时可以工作)。
我已经查看了@melchoir55提出的问题(https://issues.apache.org/jira/browse/AIRFLOW-2910),你不需要为此创建自定义操作器,问题并不是HttpHook或HttpOperator不能使用HTTPS,问题在于get_hook在处理HTTP时解析连接字符串的方式,它实际上将第一部分 (http:// 或 https://) 理解为连接类型。
简而言之,您不需要自定义操作器,只需按以下方式在 env 中设置连接即可: AIRFLOW_CONN_HTTP_EXAMPLE=http://https%3a%2f%2fexample.com/ 而不是: AIRFLOW_CONN_HTTP_EXAMPLE=https://example.com/ 或在 UI 上设置连接。
这不是一个直观的连接设置方式,但我认为他们正在研究更好地解析 Ariflow 2.0 连接的方法。

1
这个神奇的技巧就是这样!我们使用AWS Secrets Manager来管理连接,而不是使用Airflow本地的连接管理器,我们必须这样做才能让https正常工作。 - rotten

4
在 Airflow 2.x 中,你可以在设置连接时传递 https 作为模式值来使用 HTTPS URL,并仍然可以像下面所示使用 SimpleHttpOperator
    my_api = SimpleHttpOperator(
        task_id="my_api",
        http_conn_id="YOUR_CONN_ID",
        method="POST",
        endpoint="/base-path/end-point",
        data=get_data,
        headers={"Content-Type": "application/json"},
    )

enter image description here


1

我正在使用Airflow 2.1.0,以下设置适用于https API

  • 在连接UI中,通常设置主机名,无需在模式字段中指定“https”,如果您的API服务器需要登录帐户和密码,请不要忘记设置。 连接UI设置

  • 构建任务时,在SimpleHttpOperator中添加extra_options参数,并将您的CA_bundle证书文件路径作为键验证的值,如果您没有证书文件,则使用false跳过验证。 任务定义

参考:这里


0

我们可以将以下一行代码放入HttpsOperator(SimpleHttpOperator)的上方,而不是实现HttpsHook:

...

self.extra_options['verify'] = True

response = http.run(self.endpoint,
                        self.data,
                        self.headers,
                        self.extra_options)
...

0

在Airflow 2中,该问题已经得到解决。 只需检查以下内容:

  • 连接UI表单中的主机名不要以/结尾
  • SimpleHttpOperatorendpoint参数以/开头

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接