11得票2回答
Airflow:当变量不存在时,在代码中设置默认值而不会抛出异常

我有一个小问题,我想要做典型的条件语句,就像 setting_x = Variable.get('setting_x') variable = setting_x if setting_x else 0 但由于Airflow模型在键不存在时会抛出异常,因此如果不使用trycatch,就...

11得票1回答
基于外部文件的Airflow动态任务

我正在从外部文件中读取元素列表,并循环遍历这些元素以创建一系列任务。 例如,如果文件中有2个元素 - [A,B]。 那么将会有2个任务系列:A1 -> A2 .. B1 -> B2 ... 这个读取元素的逻辑并不是任何任务的一部分,而是在DAG本身中。因此,调度程序会在阅读DAG文...

11得票2回答
如何在Airflow中重启DAG?

我有一个包含许多步骤的DAG,由于数据库关闭,它在中途停止了。我希望DAG能够从上次停止的地方继续执行,但是我只能逐个启动DAG的各个任务。是否有一种方法可以告诉Airflow基于已完成任务的成功状态来从上次停止处启动DAG?以下是一个示例,其中第一个任务已经完成,其余任务已排队或没有状态: ...

11得票1回答
谷歌云Composer网页版中的DAGs无法点击,但在本地Airflow上工作正常

我正在使用Google Cloud Composer(谷歌云平台上的托管Airflow)版本为composer-0.5.3-airflow-1.9.0,Python 2.7。然而,我遇到了一个奇怪的问题:在导入我的DAG后,它们从Web UI上是无法点击的(没有"Trigger DAG"、"G...

11得票2回答
如何在Airflow操作器中跳过任务?

有没有一种方式可以让Airflow从PythonOperator跳过当前任务?例如:def execute(): if condition: skip_current_task() task = PythonOperator(task_id='task', pytho...

11得票3回答
如何在Airflow中设置SLA?

我想在传感器操作员中设置SLA。这方面的文档不是太清楚。因此,我使用S3KeySensor测试了一下,该操作员在查找不存在的文件。我将SLA设置为30秒,并希望在30秒后在UI中看到记录-在SLA未达标中-但这并没有发生。我做错了什么?inputsensor = S3KeySensor( ...

11得票7回答
Airflow的sla_miss_callback函数未触发

我一直在尝试触发slack消息回调来响应SLA超时,但发现: SLA超时成功在Airflow Web UI的slamiss/list/中注册。 on_failure_callback执行成功。 然而,sla_miss_callback函数本身却永远不会被触发。 我尝试过: 在de...

11得票2回答
基于事件触发,当文件被投放到S3存储桶时运行Airflow任务

是否可能仅在特定事件发生时运行Airflow任务,例如将文件放入特定的S3存储桶中。类似于AWS Lambda事件。 有S3KeySensor,但我不知道它是否能做到我想要的(仅在事件发生时运行任务)。 以下是示例,以使问题更加清晰: 我有一个传感器对象,如下所示:sensor = S3...

10得票5回答
如何修复Airflow DAG日志中出现的“无法从工作器获取日志文件。不支持的URL协议”错误?

我正在使用这个镜像 apache/airflow:2.1.0 通过docker运行Airflow。 请参考这个 线程 查看我面临的最初错误。 目前,我可以运行之前已存在的DAG。 然而,当我添加新的DAG时,日志文件中会出现以下错误。 我相信这不是内存或计算问题。 *** Log file...

10得票2回答
Airflow - 在同一个DAG中使用TaskGroup和PythonBranchOperator

我目前正在使用Airflow Taskflow API 2.0。 我在使用TaskGroup和BranchPythonOperator的组合时遇到了问题。 以下是我的代码: import airflow from airflow.models import DAG from airflow...