如何在虚拟环境中使用Apache Airflow?

10

我对使用Apache Airflow相当陌生。我使用PyCharm作为我的IDE,创建了一个项目(Anaconda环境),并创建了包含DAG定义和Bash操作器的Python脚本。当我打开Airflow Web服务器时,我的DAGS未显示,只显示默认示例DAGS。我的AIRFLOW_HOME变量包含~/airflow。因此,我将Python脚本存储在那里,现在显示出来了。

如何在项目环境中使用它?

我需要在每个项目开始时更改环境变量吗?

是否有一种方法可以为每个项目添加特定的Airflow主目录?

我不想将我的DAGs存储在默认的Airflow目录中,因为我想将其添加到我的Git仓库中。请帮我解决这个问题。

2个回答

21
您可以使用以下格式的环境变量来设置/覆盖在${AIRFLOW_HOME}/airflow.cfg中指定的airflow选项:$AIRFLOW__{SECTION}__{KEY}(请注意双下划线)。这里是一个链接到airflow文档。因此,您可以简单地执行以下操作
export AIRFLOW__CORE__DAGS_FOLDER=/path/to/dags/folder

然而,对于不同的项目这样做既繁琐又容易出错。作为替代方案,您可以考虑使用pipenv来管理虚拟环境,而不是使用Anaconda。这里有一篇nice guide关于pipenv和它解决的问题。其中一个pipenvdefault features是在启用虚拟环境的情况下生成shell时自动加载.env文件中定义的变量。因此,以下是使用pipenv的工作流程示例:
cd /path/to/my_project

# Creates venv with python 3.7 
pipenv install --python=3.7 Flask==1.0.3 apache-airflow==1.10.3

# Set home for airflow in a root of your project (specified in .env file)
echo "AIRFLOW_HOME=${PWD}/airflow" >> .env

# Enters created venv and loads content of .env file 
pipenv shell

# Initialize airflow
airflow initdb
mkdir -p ${AIRFLOW_HOME}/dags/

注意:我会在最后解释为什么使用Flask==1.03,这是因为pipenv检查子依赖项是否兼容以确保可重现性。
所以经过这些步骤,您将获得以下项目结构。
my_project
├── airflow
│   ├── airflow.cfg
│   ├── airflow.db
│   ├── dags
│   ├── logs
│   │   └── scheduler
│   │       ├── 2019-07-07
│   │       └── latest -> /path/to/my_project/airflow/logs/scheduler/2019-07-07
│   └── unittests.cfg
├── .env
├── Pipfile
└── Pipfile.lock

现在,当您第一次初始化Airflow时,它将创建${AIRFLOW_HOME}/airflow.cfg文件,并使用/扩展${AIRFLOW_HOME}/dags作为dags_folder的值。如果您仍需要不同的dags_folder位置,则可以再次使用.env文件。
echo "AIRFLOW__CORE__DAGS_FOLDER=/different/path/to/dags/folder" >> .env

因此,您的.env文件将如下所示:
AIRFLOW_HOME=/path/to/my_project/airflow
AIRFLOW__CORE__DAGS_FOLDER=/different/path/to/dags/folder

我们取得了什么成就以及为什么这将完美地发挥作用

  1. 由于您在虚拟环境中安装了 airflow,因此需要激活它才能使用 airflow
  2. 由于您使用了 pipenv 进行安装,因此需要使用 pipenv shell 来激活虚拟环境
  3. 由于您使用了 pipenv shell,因此您总是会获得定义在 .env 中的变量导出到您的虚拟环境中。除此之外,pipenv 仍然是一个子shell,因此当您退出时,所有额外的环境变量也会被清除。
  4. 使用 airflow 的不同项目会有不同的日志文件等位置。

pipenv 的附加说明

  1. 要将使用 pipenv 创建的虚拟环境用作 IDE 的项目解释器,请使用 pipenv --py 提供的路径。
  2. 默认情况下,pipenv 会像 conda 一样在相同的全局位置创建所有虚拟环境,但您可以通过将 export PIPENV_VENV_IN_PROJECT=1 添加到您的 .bashrc(或其他 rc)来更改该行为,以在项目的根目录中创建 .venv。然后,PyCharm 将能够在进入项目解释器设置时自动选择它。

关于使用 Flask==1.0.3 的注意事项

PyPi上的Airflow 1.10.3依赖于flask>=1.0, <2.0jinja2>=2.7.3, <=2.10.0。今天,当我测试代码片段时,可用的最新版本flask1.1.0,它依赖于jinja2>=2.10.1。这意味着虽然pipenv可以安装所有必需的软件,但它无法锁定依赖关系。因此,为了干净地使用我的代码示例,我必须指定需要与airflow要求兼容的jinja2版本的flask版本。但是不用担心,GitHub上的最新版本airflow已经解决了这个问题。

使用pipenv和virtualenv有显著的区别吗?我可以使用任何一个,还是使用pipenv在使用airflow时具有显着优势? - Command
pipenv 构建在 pipvirtualenv 之上,并在其底层使用它们。在您的情况下,优势主要来源于内置的能力,可以使用 *.env 文件来为特定项目定义 AIRFLOW_HOME ,因为在 .bashrc(或类似文件)中进行定义仍然是全局的。 - Ilya Kisil
谢谢。我该如何安装最新版本的Airflow以解决这个问题?是否有类似于“pip install --upgrade apache-airflow”的等效方法来完成这个操作,还是我需要手动从GitHub下载? - Command
看起来,airflow 几个小时前在 pypi 上发布了 1.10.4 版本。我刚刚尝试了 pipenv install apache-airflow,一切都正常。通常情况下,我希望 pipenv 在传递选项方面与 pip 类似,但最好查阅他们的文档。 - Ilya Kisil
我按照您说的做了一切,创建了一个名为test_dag.py的DAG文件,并将其放在dags目录中。我现在在我的pipenv shell中,使用airflow webserver -p 8080实例化Web服务器。它可以正常工作。但是我没有看到我的test_dag。相反,我看到了许多其他的DAG,例如
  • example_bash_operator
- Command
显示剩余2条评论

-1

编辑 airflow.cfg 文件并设置:

load_examples = False
dags_folder = /path/to/your/dag/files

如果您的Airflow目录未设置为默认值,则应设置此环境变量。如果每次更改都很麻烦,只需在您的PyCharm项目配置或本地操作系统(~/.bashrc)中设置即可。
我的建议是编写一个小脚本以在本地模式下执行Airflow。您应该单独执行Airflow Web服务器、调度程序和工作进程。
对我来说,在小型开发机器上运行Airflow服务更加方便,考虑一下这个建议。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接