使用Google Cloud Function触发Cloud Composer

3

我已经运行了下面的代码,但是在尝试使用云函数触发dag时出现了错误。以下是错误和代码的描述:



gcs-dag-trigger-function

8bhxprce8hze
Traceback (most recent call last):
  File "/layers/google.python.pip/pip/lib/python3.9/site-packages/flask/app.py", line 2073, in wsgi_app
    response = self.full_dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.9/site-packages/flask/app.py", line 1518, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/layers/google.python.pip/pip/lib/python3.9/site-packages/flask/app.py", line 1516, in full_dispatch_request
    rv = self.dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.9/site-packages/flask/app.py", line 1502, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
  File "/layers/google.python.pip/pip/lib/python3.9/site-packages/functions_framework/__init__.py", line 171, in view_func
    function(data, context)
  File "/workspace/main.py", line 52, in trigger_dag
    make_iap_request(
  File "/workspace/main.py", line 91, in make_iap_request
    raise Exception(
Exception: Bad response from application: 404 / {'Date': 'Mon, 18 Jul 2022 15:03:44 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Vary': 'Accept-Encoding', 'Server': 'gunicorn', 'X-Robots-Tag': 'noindex, nofollow', 'Set-Cookie': 'session=616e9fda-1cd1-4a96-b9e2-57a3ea0f78bb.tblTOCMLoOZPdPTHgbbMepCbRbI; Expires=Wed, 17-Aug-2022 15:03:44 GMT; HttpOnly; Path=/; SameSite=Lax', 'Content-Encoding': 'gzip', 'Via': '1.1 google', 'Alt-Svc': 'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"', 'Transfer-Encoding': 'chunked'} / '\n\n<!DOCTYPE html>\n<html lang="en">\n  <head>\n    <title>Airflow 404</title>\n    <link rel="icon" type="image/png" href="/static/pin_32.png">\n  </head>\n  <body>\n    <div style="font-family: verdana; text-align: center; margin-top: 200px;">\n      <img src="/static/pin_100.png" width="50px" alt="pin-logo" />\n      <h1>Airflow 404</h1>\n      <p>Page cannot be found.</p>\n      <a href="/">Return to the main page</a>\n      <p>fbffada7b897</p>\n    </div>\n  </body>\n</html>'

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = 'xxxxxxxxx-gtld8n5rbu8ncs3l80fvnb2903pdq8p2.apps.googleusercontent.com'
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'xxxxxxxxxxxxxxxx-tp'
    # The name of the DAG you wish to trigger
    dag_name = 'GcsToBigQueryTriggered'

    if USE_EXPERIMENTAL_API:
        endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
        json_data = {'conf': data, 'replace_microseconds': 'false'}
    else:
        endpoint = f'api/v1/dags/{dag_name}/dagRuns'
        json_data = {'conf': data}
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/'
        + endpoint
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text
# END COPIED IAP CODE

非常感谢您的帮助。我已经删除了敏感个人信息并用xxx替换了它们。我尝试了多次触发云作曲家dag,但一直没有成功。我不确定为什么会出现错误。


根据文档中的代码,看起来您正在运行Cloud Composer 1。您能确认一下吗?此外,您的部署使用哪个版本的Airflow?从Cloud Function调用API所需的配置在不同版本之间会有所变化。 - ErnestoC
@ErnestoC 我的确正在使用Cloud Composer 1,根据Cloud Composer网站上的说明,我使用的Airflow版本是v2.2.5+composer。我的代码需要做出任何更改吗? - DamianO
@ErnestoC 补充。我还尝试使用curl命令复制该代码:curl --request POST --header "Authorization: Bearer $(gcloud auth print-identity-token)" --data "{'conf': ${DATA}}" "https://f2aced753e4458aacp-tp.appspot.com/api/v1/dags/GcsToBigQueryTriggered/dagRuns") 我收到了这个错误消息:Invalid IAP credentials: JWT audience doesn't match this application ('aud' claim (618104708054-9r9s1c4alg36erliucho9t52n32n6dgq.apps.googleusercontent.com) doesn't match expected value (336413571322-scq9vkcuu9164v6ihsb75in4tkvsaab0.apps.googleusercontent.com)) - DamianO
1个回答

1
在你的代码中,你将USE_EXPERIMENTAL_API设为True,这意味着你的函数尝试调用Airflow的实验性REST API(api/experimental/dags/{dag_name}/dag_runs)。
由于你正在运行Airflow 2,你需要覆盖以下Airflow配置字段,以接受实验API的请求,如下所示:
auth_backend = default.api.auth.backend.default
enable_experimental_api = True

您展示的cURL命令无法工作,因为身份标记要求将客户端ID作为--audiences参数。我测试了以下请求,并且可以看到DAG被排队(前提是根据实验API的说明覆盖了配置)。
curl -X "POST" \
  "https://<WEBSERVER_ID>.appspot.com/api/experimental/dags/<DAG_NAME>/dag_runs" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $(gcloud auth print-identity-token --audiences=<CLIENT_ID_STRING>.apps.googleusercontent.com)" \
  -d '{"conf": {}, "replace_microseconds": "false"}'

如果您没有使用实验性API,您需要将函数中的USE_EXPERIMENTAL_API变量更改为False。根据您的运行时服务帐户电子邮件的长度(64个字符限制),您必须通过控制台或使用所示的gcloud命令手动将其添加为Airflow用户(服务帐户还必须分配Composer用户角色)。而cURL请求则会针对稳定的REST端点。
curl -X "POST" \
  "https://<WEBSERVER_ID>.appspot.com/api/v1/dags/<DAG_NAME>/dagRuns" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $(gcloud auth print-identity-token --audiences=<CLIENT_ID_STRING>.apps.googleusercontent.com)" \
  -d '{"conf": {}}'

请告诉我这些信息是否有用!

1
你太棒了!!!我终于让它工作了!!你完全正确!我将USE_EXPERIMENTAL_API更改为false,并手动添加了一个airflow用户,使用我的现有服务帐户!我无法告诉你我有多感激。我已经在这个问题上工作了几周了!!@ErnestoC - DamianO
1
如果有人想要详细了解我如何执行此代码,我在Medium.com上发布了一篇解释文章[链接](https://dohmhen.medium.com/advanced-google-cloud-composer-pipeline-cloud-function-trigger-bb6d2a913f2c)。 - DamianO

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接