我开始学习Airflow。我需要从我的PostgreSQL数据库获取访问令牌,然后使用该访问令牌使用SimpleHttpOperator函数查询API。
这是我的代码:
from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime
import json
default_args = {
'start_date':datetime(2021, 1, 1)
}
def _get_access_token():
request = "SELECT access_token FROM access_token"
postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
connection = postgres_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
jobs = cursor.fetchall()
access_token = ([i[0] for i in jobs])
return access_token
with DAG('get_broadworks_subscribers', schedule_interval='@once',
default_args = default_args,
catchup=False) as dag:
# Tasks
get_access_token = PythonOperator(
task_id='get_access_token',
python_callable=_get_access_token
)
get_subscribers_list = SimpleHttpOperator(
task_id = 'get_subscribers_list',
http_conn_id = 'webex',
endpoint = 'v1/broadworks/subscribers/',
method = 'GET',
authorization = "Bearer" + " " + access_token[0],
headers = {
"Authorization": "authorization"
},
response_filter = lambda response: json.loads(response.text),
log_response = True
)
get_access_token >> get_subscribers_list
我遇到了以下错误:
authorization = "Bearer" + " " + access_token[0],
NameError: name 'access_token' is not defined
我希望你能帮助我,非常感谢。