这里有一个愚蠢的想法...
我在airflow中创建了(许多)DAG,它们可以正常工作。但是,我希望以某种方式将其打包起来,以便我可以运行单个DAG Run而无需安装airflow;也就是说,使其自包含,不需要所有的Web服务器、数据库等。
我通常使用trigger dag实例化新的DAG Run,并且注意到运行airflow的开销相当高(worker负载几乎为零,有时需要10秒钟左右才能排队依赖任务等)。
我对所有的日志记录等并不太关心。
这里有一个愚蠢的想法...
我在airflow中创建了(许多)DAG,它们可以正常工作。但是,我希望以某种方式将其打包起来,以便我可以运行单个DAG Run而无需安装airflow;也就是说,使其自包含,不需要所有的Web服务器、数据库等。
我通常使用trigger dag实例化新的DAG Run,并且注意到运行airflow的开销相当高(worker负载几乎为零,有时需要10秒钟左右才能排队依赖任务等)。
我对所有的日志记录等并不太关心。
您可以创建一个脚本来执行airflow操作,但这会丢失Airflow提供的所有元数据。您仍然需要将Airflow安装为Python包,但不需要运行任何Web服务器等。一个简单的示例可能如下所示:
from dags.my_dag import operator1, operator2, operator3
def main():
# execute pipeline
# operator1 -> operator2 -> operator3
operator1.execute(context={})
operator2.execute(context={})
operator3.execute(context={})
if __name__ == "__main__":
main()
首先,我建议您调整空气流量设置。
但如果这不是一个选项,另一种方法是将主要逻辑编写在DAG之外的代码中(这也是最佳实践)。对我来说,这使得本地测试代码更容易。
编写shell脚本很容易将几个进程绑定在一起。
您将无法获得操作器或依赖项的好处,但您可能可以通过脚本解决问题。如果您无法解决问题,只需使用Airflow。
dag = my_pipeline_dag()
if __name__ == "__main__":
dag.test()
或者如果你想指定执行日期
import pendulum # NOT datetime (doesn't work)
if __name__ == "__main__":
dag.test(execution_date=pendulum.datetime(2023, 6, 1, 10, 0, 0))
提示
pip install apache-airflow --upgrade
airflow db reset
如果导入的airflow模块无法导入,您可以对其进行重载。例如,如果您正在使用from airflow.decorators import dag, task
,则可以重载@dag
和@task
装饰器:
from datetime import datetime
try:
from airflow.decorators import dag, task
except ImportError:
mock_decorator = lambda f=None,**d: f if f else lambda x:x
dag = mock_decorator
task = mock_decorator
@dag(schedule=None, start_date=datetime(2022, 1, 1), catchup=False)
def mydag():
@task
def task_1():
print("task 1")
@task
def task_2(input):
print("task 2")
task_2(task_1())
_dag = mydag()