通过文件监视器在airflow中触发dag

4
我使用看门狗API找到了这篇文章,它似乎正是我所需要的:https://medium.com/@phanikumaryadavilli/hacking-apache-airflow-to-trigger-dags-based-on-filesystem-events-25f822fd08c3
(代码不是由我编写的)
import os
import time
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime, timedelta


dag = DAG(dag_id="test_trigger_dag_operator",default_args={"owner":"Airflow", "start_date":datetime(2020,3,9)})

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dag_run_operator",
    trigger_dag_id="dummy_operator",
    conf={"message": "Hello World"},
    dag=dag,
    )

class Handler(FileSystemEventHandler):
    def on_created(self, event):
        if event.event_type == 'created':
            print("file created")
            print('Executing the dag')
            trigger

def main():
    observer = Observer()
    event_handler = FileSystemEventHandler()
    observer_path = os.getcwd()
    observer.schedule(Handler(), observer_path, recursive=False)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()

if __name__ == '__main__':
    main()

很遗憾,使用作者的代码,DAG 的唯一作用是无条件触发目标 DAG,而 main() 从未被调用,也就没有文件监视。

我对代码进行了轻微修改,在 TriggerDagRunOperator 中添加了 python_callable 属性,并在 main 中添加了必要的参数 (context, dag_run_obj)。

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dag_run_operator",
    trigger_dag_id="dummy_operator",
    conf={"message": "Hello World"},
    python_callable: main,
    dag=dag,
    )

并且移除

if __name__ == '__main__':
    main()

部分。

现在文件监视器正在工作,但是目标DAG无论如何都会被触发,并且调度程序一旦开始DAG就会挂起。(这与 while (true) 的预期相同)我该如何以可行的方式使用提供的代码?

1个回答

2
Airflow有自己的服务名为DagBag Filling,它会解析您的dag并将其放入DagBag中。DagBag是您在UI和元数据DB上看到的dag集合。
在对文件进行DagBag填充(解析其中的任何DAG)时,它实际上永远不会结束!您正在此DAG文件定义中运行该监视器。
为避免这种情况,您需要实现Sensor。
Sensor-等待(轮询)一定时间、文件、数据库行、S3密钥等的操作符...
这将完全按照您的要求执行任务,但防止环境崩溃。这本质上将是在自定义传感器的重写poke()方法中重新实现您的主要函数。
您可以检查contrib存储库中的任何现有传感器,或根据您的需求编写自定义传感器。
另外,如果您只想从应用程序触发dag,可以通过REST API提交POST请求到那个dagid。
两种实现都可以解决您的问题。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接