问题 在Airflow中是否有一种方法可以创建一个工作流,使得任务B.* 的数量直到Task A完成之后才能确定?我已经查看了子DAG,但它似乎只能使用在Dag创建时就确定的静态一组任务。 DAG触发器是否可行?如果可以,能否提供一个示例。 我的问题是,在Task A完成之前,无法知道计...
我是Airbnb开源工作流/数据管道软件airflow的新用户。在启动Web UI后,会出现几十个默认示例dag。我试图删除这些dag的方法很多,但都失败了。 load_examples = False在airflow.cfg中设置。 文件夹lib/python2.7/site-packa...
我已经启动了Airflow Web服务器并计划了一些DAG。我可以在Web GUI上看到这些DAG。 如何从运行和Web GUI中显示的特定DAG中删除?有没有Airflow CLI命令可以做到这一点? 我找了一圈,但找不到一种简单的方法来删除已加载和计划的DAG。
我在这个论坛上是一个新手。但我已经在我们公司使用airflow玩了一段时间。如果这个问题听起来很蠢,对不起。我正在使用一堆BashOperators编写管道。基本上,对于每个任务,我想使用'curl'简单调用REST api。这就是我的管道的样子(非常简化版):from airflow imp...
我试图提供有用的信息,但我离数据工程师还很远。 我目前正在使用Python库pandas对我的数据执行一系列长时间的转换,其中有许多输入(目前是CSV和Excel文件)。输出是几个Excel文件。我希望能够执行定期监控的批处理作业,并进行并行计算(我指的是不像我使用pandas那样顺序执行)...
我希望能够创建一个符合下面示意图的Airflow条件任务。 预期的场景如下: 任务1执行 如果任务1成功,则执行任务2a 否则,如果任务1失败,则执行任务2b 最后执行任务3 上述所有任务均为SSHExecuteOperator。 我猜应该使用ShortCircuitOperator...
假设您有一个Airflow DAG,不适合回溯填充(backfill),这意味着在它运行一次后,快速地连续运行它将完全是无意义的。 例如,如果您正在从某个仅每小时更新一次的源加载数据到数据库中,那么回溯填充将只是一遍又一遍地导入相同的数据。 当您实例化一个新的每小时任务时,这将特别烦人,因...
我如何在Airflow UI上停止/终止正在运行的任务?我正在使用LocalExecutor。 即使我使用CeleryExecutor,我该如何终止/停止正在运行的任务?
在我的一些Apache Airflow安装中,即使调度程序似乎没有完全加载,也不会运行计划运行的DAG或任务。如何增加可以同时运行的DAG或任务的数量? 同样地,如果我的安装负载很高,并且我想限制Airflow工作程序提取排队任务的速度(例如为了减少资源消耗),我可以调整什么来减少平均负载?
在Airflow中写入日志的一种方法是从PythonOperator中返回一个字符串,就像这里第44行所示。 是否还有其他方式可以让我写入Airflow日志文件?我发现使用print语句无法保存到日志中。