10得票2回答
Airflow - 破损的DAG - 超时

我有一个DAG用于执行连接到Postgres DB的函数,删除表格中的内容,并插入新的数据集。 我在本地进行尝试时发现,当我尝试运行此代码时,Web服务器需要很长时间才能连接,并且大多数情况下不成功。但是,在连接过程中似乎正在从后端执行查询。由于我有一个删除函数,因此我看到数据从表中被删除(...

10得票3回答
如何将参数传递给Airflow中的on_success_callback和on_failure_callback函数

我已经使用on_success_callback和on_failure_callback实现了成功和失败的电子邮件警报。 根据Airflow文档, 将一个上下文字典作为单个参数传递给此函数。 如何向这些回调方法传递另一个参数? 以下是我的代码from airflow.utils...

10得票2回答
如何在DEV和PROD环境之间迁移Airflow变量?

我们正在使用Airflow调度我们的数据管道,另外我们还在Airflow管理中添加了一些连接和变量。 在DEV环境中一切都运行良好,现在我们想要设置PROD环境。我们如何将这些值迁移到PROD环境中。

10得票2回答
Airflow回填新添加到DAG的任务

假设今天是2017年10月20日。我有一个已经成功运行到今天的dag。我需要添加一个任务,起始日期为2017年10月1日。如何使调度器自动触发从2017年10月1日到2017年10月20日的任务?

10得票2回答
Airflow任务能否在运行时动态生成DAG?

我有一个上传文件夹,里面会不定期地上传文件。对于每个上传的文件,我想生成一个特定于该文件的DAG。 我的第一个想法是使用FileSensor来监视上传文件夹,并在有新文件的情况下触发一个任务来创建单独的DAG。概念上如下: Sensor_DAG (FileSensor -> Crea...

9得票3回答
Airflow外部传感器在poking时卡住了。

我希望一个dag在另一个dag完成后启动。一种解决方案是使用外部传感器函数,下面您可以找到我的解决方案。我遇到的问题是依赖的dag被卡在poking阶段,我查看了这个答案并确保两个dag在同一时间表上运行,我的简化代码如下: 任何帮助都将不胜感激。 领导dag: from airflow i...

9得票5回答
Airflow dags and PYTHONPATH

我有一些DAG无法定位Python模块。在Airflow UI中,我看到了很多这样的消息变化。 Broken DAG: [/home/airflow/source/airflow/dags/test.py] No module named 'paramiko' 在文件中,我可以直接修改Py...

9得票2回答
Airflow传感器中的“reschedule”模式如何工作?

我有一个Airflow Http传感器,会调用REST端点,并检查API返回的JSON结构中是否有特定值。 sensor = HttpSensor( soft_fail=True, task_id='http_sensor_check', http_conn_id='...

9得票1回答
空气流量 - 如何仅一次“填充DagBag”

我的 DAG 需要约 50 秒才能解析,我只使用外部触发器来启动 DAG 运行,没有调度。我注意到 Airflow 想要频繁填充 DAG bag --> 在每个 trigger_dag 命令和后台中,它一直在检查 dags 文件夹并且似乎即时创建 .pyc 文件一旦新的 .py 文件部署...

9得票4回答
Airflow:为什么操作器需要一个 start_date?

我不理解为什么操作器(任务实例)需要一个“start_date”。我们传递给DAG的日期不应该已经足够吗? 另外,如果当前时间是UTC 2018年2月7日8:30 am,并且我将DAG的start_date设置为2月7日00:00 am,并使用cron表达式调度间隔为每天9:30 am (即...