我已经在Airflow上工作了一段时间,与调度器无关的问题不多,但现在我遇到了一个问题。
基本上,我有一个脚本和dag用于执行任务,但是任务不会周期性地运行。相反,它需要在随机时间被激活。(外部方将告诉我们何时运行它。这可能在接下来的几个月中发生多次。)
是否有任何方式手动触发DAG? 也欢迎提供任何其他方向/建议。 谢谢。
我已经在Airflow上工作了一段时间,与调度器无关的问题不多,但现在我遇到了一个问题。
基本上,我有一个脚本和dag用于执行任务,但是任务不会周期性地运行。相反,它需要在随机时间被激活。(外部方将告诉我们何时运行它。这可能在接下来的几个月中发生多次。)
是否有任何方式手动触发DAG? 也欢迎提供任何其他方向/建议。 谢谢。
你有几个选项:
airflow trigger_dag <dag_id>
,查看文档。请注意,较新版本的Airflow使用语法airflow dags trigger <dag_id>
POST /api/experimental/dags/<dag_id>/dag_runs
,查看文档。TriggerDagRunOperator
,请参阅https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/trigger_dagrun/index.html#airflow.operators.trigger_dagrun.TriggerDagRunOperator中的文档以及https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py中的示例。如果这真的是100%手动进行的话,您可能会选择UI或CLI。如果您希望让外部方间接触发它,则可以使用API或操作符作为选项。记得在DAG上设置schedule_interval=None
。
因此,DAG 可以通过以下方式触发:
Using the REST API Reference(see documentation)
endpoint-
> POST /api/experimental/dags/<DAG_ID>/dag_runs
Using Curl:
curl -X POST
http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs
-H 'Cache-Control: no-cache'
-H 'Content-Type: application/json'
-d '{"conf":"{"key":"value"}"}'
Using Python requests:
import requests
response = requests.post(url, data=json.dumps(data), headers=headers)
Using the trigger DAG option present in the UI as mentioned by @Daniel
curl -X POST 'http://localhost:8080/api/v1/dags/<DAG_ID>/dagRuns' \
--header 'accept: application/json' \
--header 'Content-Type: application/json' \
--user '<user>:<password>' \ # If authentication is used
--data '{}'
POST /api/experimental/dags/<DAG_ID>/dag_runs
。使用此方法,您还可以传递DAG运行的配置参数。