如何在Python脚本中触发Airflow的DAG运行?

11

我使用Apache Airflow创建了一些DAG,其中一些不按照时间表运行。
我正在尝试找到一种方法,可以从Python脚本内部触发特定的DAG运行。这是否可行?我该如何操作?

编辑 --- Python脚本将在与所有我的DAG不同的项目中运行。

2个回答

18

在触发Airflow DAG运行时,您有多种选择。

使用Python

airflow python包提供了一个本地客户端,您可以在python脚本内使用它来触发dag。例如:

from airflow.api.client.local_client import Client

c = Client(None, None)
c.trigger_dag(dag_id='test_dag_id', run_id='test_run_id', conf={})

使用Airflow CLI

您可以使用Airflow CLI手动触发DAG的运行。有关如何使用CLI触发DAG的更多信息,请单击此处

使用Airflow REST API

您还可以使用Airflow REST api来触发DAG运行。有关更多信息,请单击此处


从Python中的第一个选项可能最适合您(这也是我个人过去所做的)。但是您理论上可以使用subprocess与Python中的CLI进行交互,或者使用类似requests的库从Python内部与REST API进行交互。


你知道我怎么才能从另一个项目中导入所有的DAG吗? - Oliver Robie
如果您正在尝试在另一个Python项目中执行此操作,则需要确保该项目可以访问相同的Airflow配置、数据库后端和DAG。然后,您可以导入airflow模块,就可以开始使用了。或者,您可以通过HTTP公开运行中的Airflow实例。然后,在第二个项目中使用类似requests的库向原始运行的Airflow堆栈发出HTTP请求以触发dag。 - Josh
我仍然无法通过“local_client”运行在测试中定义的DAG。如果您可以查看一下这个问题,我将不胜感激! - scubbo

0
在AWS MWAA Airflow 1.10.12上,我使用了基于Python的boto3库和REST POST请求的解决方案。
import boto3
import requests

def TriggerAirflowDAG(mwaa_environment, dag_id):
    client = boto3.client("mwaa")
    token = client.create_cli_token(Name=mwaa_environment)
    url = "https://{0}/aws_mwaa/cli".format(token["WebServerHostname"])
    body = f"trigger_dag {dag_id}"
    headers = {
        "Authorization": "Bearer " + token["CliToken"],
        "Content-Type": "text/plain"
    }
    return requests.post(url, data=body, headers=headers)

发起DAG运行的用户/角色必须具有AmazonMWAAAirflowCliAccess策略。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接