Airflow - 不在同一DAG文件夹中的Python文件

25

我正在尝试使用Airflow来执行一个简单的Python任务。

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

如果我试一下,例如:

airflow test python_test print 2015-01-01

它有效!

现在我想把我的 def print_context(ds, **kwargs) 函数放到另一个 Python 文件中。所以我创建了另一个名为 simple_test.py 的文件并进行如下更改:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)

现在我再试一次运行:

airflow test python_test print 2015-01-01

没问题!它仍然能工作!

但是如果我创建一个模块,例如名为SimplePython.py的worker模块,导入它(from worker import SimplePython),并尝试:

airflow test python_test print 2015-01-01

它会显示以下信息:

ImportError:找不到名为worker的模块

问题:

  1. 在DAG定义中导入模块是否可行?
  2. Airflow + Celery将如何在工作节点之间分发所有必要的Python源文件?

Note: I have kept the HTML tags and made the content more understandable.
4个回答

14
您可以按照以下方式打包DAG的依赖项:

https://airflow.apache.org/concepts.html#packaged-dags

为了实现这一点,您可以创建一个包含DAG(s)的zip文件,并将额外的模块解压缩到目录中。例如,您可以创建一个如下所示的zip文件:
my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

当使用CeleryExecutor时,您需要手动同步DAG目录,Airflow不会为您处理:
Airflow将扫描zip文件并尝试加载my_dag1.py和my_dag2.py。它不会进入子目录,因为这些被认为是潜在的包。

https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery

工作人员需要访问其DAGS_FOLDER,并且您需要通过自己的方式同步文件系统。

你好 @ImDarrenG,我在使用打包的 DAGs 时遇到了问题。能否请您查看我的问题?谢谢! - Codious-JR

11

虽然在文档中提到的将您的 DAG 打包成 zip 是我见过的唯一受支持的解决方案,但您也可以导入位于 dags 文件夹内部的模块。如果您使用其他工具(如 Puppet 和 Git)自动同步 dags 文件夹,这将非常有用。

由于我不清楚您的目录结构,请参考以下基于典型 Python 项目结构的 DAG 文件夹示例:

└── airflow/dags  # root airflow dags folder where all dags live
    └── my_dags  # git repo project root
        ├── my_dags  # python src root (usually named same as project)
           ├── my_test_globals.py  # file I want to import
           ├── dag_in_package.py 
           └── dags 
                └── dag_in_subpackage.py
        ├── README.md  # also setup.py, LICENSE, etc here
        └── dag_in_project_root.py

我已经省略了(必需的[1])__init__.py文件。请注意三个示例DAG的位置。您几乎肯定只会使用其中一个位置来存放所有您的DAG。我在这里包含它们所有只是为了举个例子,因为导入时不应该有影响。要从其中任何一个导入my_test_globals

from my_dags.my_dags import my_test_globals

我认为这意味着Airflow在Python路径设置为DAG目录下运行,因此可以将DAG文件夹的每个子目录视为Python包。在我的情况下,额外的中间项目根目录妨碍了典型的包内绝对导入。因此,我们可以像这样重新构建Airflow项目:

└── airflow/dags  # root airflow dags folder where all dags live
    └── my_dags  # git repo project root & python src root
        ├── my_test_globals.py  # file I want to import
        ├── dag_in_package.py 
        ├── dags 
            └── dag_in_subpackage.py
        ├── README.md  # also setup.py, LICENSE, etc here
        └── dag_in_project_root.py

为了让导入的内容看起来符合我们的期望:

from my_dags import my_test_globals

很棒的答案。喜欢这个目录结构。 - Manuel

2
针对您的第二个问题:Airflow+Celery如何在工作节点之间分发所有必要的Python源文件?
根据文档:工作节点需要访问其DAGS_FOLDER,并且您需要通过自己的方式同步文件系统。常见的设置是将您的DAGS_FOLDER存储在Git存储库中,并使用Chef、Puppet、Ansible或您用于配置环境中的机器的任何其他工具在计算机之间同步它。如果所有的盒子都有一个共同的挂载点,那么在那里共享您的管道文件应该也可以工作。
参考链接:http://pythonhosted.org/airflow/installation.html?highlight=chef

1

关于您的第一个问题,是可行的。

我猜您应该在与SimplePython.py相同的目录下创建一个名为__init__.py的空文件(在您的情况下是worker目录)。通过这样做,worker目录将被认为是一个Python模块。

然后在您的DAG定义中,尝试from worker.SimplePython import print_context

在您的情况下,我猜最好编写一个Airflow插件,因为您可能希望升级Airflow核心项目而不删除自定义函数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接