在没有Apache Airflow的情况下运行Apache Airflow DAG

4

这里有一个愚蠢的想法...

我在airflow中创建了(许多)DAG,它们可以正常工作。但是,我希望以某种方式将其打包起来,以便我可以运行单个DAG Run而无需安装airflow;也就是说,使其自包含,不需要所有的Web服务器、数据库等。

我通常使用trigger dag实例化新的DAG Run,并且注意到运行airflow的开销相当高(worker负载几乎为零,有时需要10秒钟左右才能排队依赖任务等)。

我对所有的日志记录等并不太关心。

5个回答

4

您可以创建一个脚本来执行airflow操作,但这会丢失Airflow提供的所有元数据。您仍然需要将Airflow安装为Python包,但不需要运行任何Web服务器等。一个简单的示例可能如下所示:

from dags.my_dag import operator1, operator2, operator3

def main():
    # execute pipeline
    # operator1 -> operator2 -> operator3

    operator1.execute(context={})
    operator2.execute(context={})
    operator3.execute(context={})

if __name__ == "__main__":
    main()

1

首先,我建议您调整空气流量设置。

但如果这不是一个选项,另一种方法是将主要逻辑编写在DAG之外的代码中(这也是最佳实践)。对我来说,这使得本地测试代码更容易。

编写shell脚本很容易将几个进程绑定在一起。

您将无法获得操作器或依赖项的好处,但您可能可以通过脚本解决问题。如果您无法解决问题,只需使用Airflow。


1
似乎您主要关心的是闲置工人浪费资源,而不是Airflow本身的浪费。我建议在单个盒子上使用LocalExecutor运行Airflow。这样可以享受并发执行的好处,而无需管理工作人员的麻烦。
至于数据库-没有办法在不修改Airflow源代码的情况下删除数据库组件。一个替代方案是使用SQLite的SequentialExecutor,但这会删除运行并发任务的能力,并且不建议用于生产。

1

Airflow 2.5.x及以上版本的新功能

dag = my_pipeline_dag()

if __name__ == "__main__":
    dag.test()

或者如果你想指定执行日期

import pendulum  # NOT datetime (doesn't work)

if __name__ == "__main__":
    dag.test(execution_date=pendulum.datetime(2023, 6, 1, 10, 0, 0))

提示

  • 升级你的Airflow: pip install apache-airflow --upgrade
  • 你可能需要重置你的Airflow数据库: airflow db reset

0

如果导入的airflow模块无法导入,您可以对其进行重载。例如,如果您正在使用from airflow.decorators import dag, task,则可以重载@dag@task装饰器:

from datetime import datetime

try:
    from airflow.decorators import dag, task
except ImportError:
    mock_decorator = lambda f=None,**d: f if f else lambda x:x
    dag = mock_decorator
    task = mock_decorator


@dag(schedule=None, start_date=datetime(2022, 1, 1), catchup=False)
def mydag():

    @task
    def task_1():
        print("task 1")

    @task
    def task_2(input):
        print("task 2")

    task_2(task_1())

_dag = mydag()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接