我正在尝试根据用户输入在Airflow中生成动态工作流。我知道有基于文件和数据库的选项,但在所有这些情况下,工作流都不会直接依赖于用户输入。而且,在多个用户使用同一个dag的情况下,可能会出现问题。为了避免这些问题,我考虑将用户输入传递给子dag并生成工作流程。但是,子dag没有从ui传递用户输入的选项。
我猜使用变量是解决问题的好方法,但用户可能会覆盖彼此的更改(可能会出现一些问题)。
备选方案1:
Airflow在顶部具有REST API,支持dag触发功能。
请求示例:
curl -X POST \
'http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs' \
--header 'Cache-Control: no-cache' \
--header 'Content-Type: application/json' \
--data '{"conf":"{\"key\":\"value\"}"}'
数据部分可以存储一些用户输入,稍后将在Airflow操作器中访问。
备选方案2:
Airflow支持CLI界面,可用于触发DAG。您可以将额外的配置作为配置参数(-c
选项)指定。配置可以存储用户输入。
命令格式:
airflow trigger_dag [-h] [-sd SUBDIR] [-r RUN_ID] [-c CONF] [-e EXEC_DATE]
dag_id
根据文档,您可以在Airflow中使用Variable
。
Variables
是一种通用的方式,在Airflow中作为简单的键值存储来存储和检索任意内容或设置。变量可以从UI(管理 -> 变量)、代码或CLI中列出、创建、更新和删除。
您可以参考以下链接以进一步了解:
您可以使用xcom在Airflow中传递用户输入 => custom-xcom-backend documentation
使用airflow-rest-plugin,通过xcom
从内部获取输入并传递到子DAG。
有很多技巧可以完成同样的任务,但实际解决方案应该来自于 airflow 的动态任务,目前还不存在。希望我们能在未来版本的 airflow 中看到它。