我使用看门狗API找到了这篇文章,它似乎正是我所需要的:https://medium.com/@phanikumaryadavilli/hacking-apache-airflow-to-trigger-dags-based-on-filesystem-events-25f822fd08c3。
(代码不是由我编写的)
(代码不是由我编写的)
import os
import time
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime, timedelta
dag = DAG(dag_id="test_trigger_dag_operator",default_args={"owner":"Airflow", "start_date":datetime(2020,3,9)})
trigger = TriggerDagRunOperator(
task_id="test_trigger_dag_run_operator",
trigger_dag_id="dummy_operator",
conf={"message": "Hello World"},
dag=dag,
)
class Handler(FileSystemEventHandler):
def on_created(self, event):
if event.event_type == 'created':
print("file created")
print('Executing the dag')
trigger
def main():
observer = Observer()
event_handler = FileSystemEventHandler()
observer_path = os.getcwd()
observer.schedule(Handler(), observer_path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == '__main__':
main()
很遗憾,使用作者的代码,DAG 的唯一作用是无条件触发目标 DAG,而 main()
从未被调用,也就没有文件监视。
我对代码进行了轻微修改,在 TriggerDagRunOperator
中添加了 python_callable
属性,并在 main 中添加了必要的参数 (context, dag_run_obj
)。
trigger = TriggerDagRunOperator(
task_id="test_trigger_dag_run_operator",
trigger_dag_id="dummy_operator",
conf={"message": "Hello World"},
python_callable: main,
dag=dag,
)
并且移除
if __name__ == '__main__':
main()
部分。
现在文件监视器正在工作,但是目标DAG无论如何都会被触发,并且调度程序一旦开始DAG就会挂起。(这与 while (true)
的预期相同)我该如何以可行的方式使用提供的代码?