我使用Apache Airflow创建了一些DAG,其中一些不按照时间表运行。
我正在尝试找到一种方法,可以从Python脚本内部触发特定的DAG运行。这是否可行?我该如何操作?
编辑 --- Python脚本将在与所有我的DAG不同的项目中运行。
我使用Apache Airflow创建了一些DAG,其中一些不按照时间表运行。
我正在尝试找到一种方法,可以从Python脚本内部触发特定的DAG运行。这是否可行?我该如何操作?
编辑 --- Python脚本将在与所有我的DAG不同的项目中运行。
在触发Airflow DAG运行时,您有多种选择。
airflow python包提供了一个本地客户端,您可以在python脚本内使用它来触发dag。例如:
from airflow.api.client.local_client import Client
c = Client(None, None)
c.trigger_dag(dag_id='test_dag_id', run_id='test_run_id', conf={})
您可以使用Airflow CLI手动触发DAG的运行。有关如何使用CLI触发DAG的更多信息,请单击此处。
您还可以使用Airflow REST api来触发DAG运行。有关更多信息,请单击此处。
从Python中的第一个选项可能最适合您(这也是我个人过去所做的)。但是您理论上可以使用subprocess与Python中的CLI进行交互,或者使用类似requests的库从Python内部与REST API进行交互。
boto3
库和REST POST请求的解决方案。import boto3
import requests
def TriggerAirflowDAG(mwaa_environment, dag_id):
client = boto3.client("mwaa")
token = client.create_cli_token(Name=mwaa_environment)
url = "https://{0}/aws_mwaa/cli".format(token["WebServerHostname"])
body = f"trigger_dag {dag_id}"
headers = {
"Authorization": "Bearer " + token["CliToken"],
"Content-Type": "text/plain"
}
return requests.post(url, data=body, headers=headers)
发起DAG运行的用户/角色必须具有AmazonMWAAAirflowCliAccess
策略。