Airflow:如何将从我的数据库中获取的变量传递给SimpleHttpOperator函数

3

我开始学习Airflow。我需要从我的PostgreSQL数据库获取访问令牌,然后使用该访问令牌使用SimpleHttpOperator函数查询API。

这是我的代码:

from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator

from datetime import datetime
import json


default_args = {
    'start_date':datetime(2021, 1, 1)
}

def _get_access_token():
    request = "SELECT access_token FROM access_token"
    postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
    connection = postgres_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(request)
    jobs = cursor.fetchall()
    access_token = ([i[0] for i in jobs])

    return access_token


with DAG('get_broadworks_subscribers', schedule_interval='@once',
    default_args = default_args,
    catchup=False) as dag:

    # Tasks

    get_access_token = PythonOperator(
    task_id='get_access_token', 
    python_callable=_get_access_token
    )

    get_subscribers_list = SimpleHttpOperator(
        task_id = 'get_subscribers_list',
        http_conn_id = 'webex',
        endpoint = 'v1/broadworks/subscribers/',
        method = 'GET',
        authorization = "Bearer" + " " + access_token[0],
        headers = {
            "Authorization": "authorization"
        },
        response_filter = lambda response: json.loads(response.text),
        log_response = True
    )

get_access_token >> get_subscribers_list

我遇到了以下错误:

    authorization = "Bearer" + " " + access_token[0],
NameError: name 'access_token' is not defined

我希望你能帮助我,非常感谢。

1个回答

2
您可能期望Python函数返回值供以后在代码中使用。但这不是Airflow的工作方式。任务之间不共享数据,但可以通过Xcom共享元数据。XcomPythonOperator的返回值推送到xcom(元存储中的表)。然后下游任务可以读取该值并在字段被模板化时使用它。此外,SimpleHttpOperator没有authorization参数。
因此,您的代码可能如下所示:
get_subscribers_list = SimpleHttpOperator(
    task_id = 'get_subscribers_list',
    http_conn_id = 'webex',
    endpoint = 'v1/broadworks/subscribers/',
    method = 'GET',
    headers = {
        "Authorization": """Bearer {{ task_instance.xcom_pull(task_ids="get_access_token") }} """
    },
    response_filter = lambda response: json.loads(response.text),
    log_response = True
)

由于headers模板化的,您可以从上游任务中获取xcom值。

注意:我不建议像这样传递令牌。您可能希望考虑将其安全地存储在Airflow变量中。这还将使您免于在单独的任务中从数据库查询它的麻烦。如果您将其存储在变量中,您需要更改的只有:

    headers = {
        "Authorization": """Bearer {{ var.value.get('my_var_name') }} """
    }

请注意,Airflow会自动屏蔽包含以下任何一项的键的值:'password'、'secret'、'passwd'、'authorization'、'api_key'、'apikey'、'access_token'。但是,如果您选择使用不包含这些内容的键,则仍然可以隐藏它,只需将该字符串添加到airflow.cfg中的sensitive_var_conn_names中即可。有关此信息的更多信息,请参见文档

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接