如何在虚拟环境中运行Airflow PythonOperator

22

我有几个Python文件,目前是使用BashOperator执行的。这使我可以轻松选择Python虚拟环境。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
   'owner': 'airflow',
    'depends_on_past': False,
    ...}

dag = DAG('python_tasks', default_args=default_args, schedule_interval="23 4 * * *")

t1 = BashOperator(
                 task_id='task1',
                bash_command='~/anaconda3/envs/myenv/bin/python 
                              /python_files/python_task1.py',
                 dag=dag)

我该如何使用PythonOperator实现类似的功能?

from airflow.operators.bash_operator import PythonOperator
import python_files.python_task1

python_task = PythonOperator(
              task_id='python_task',
              python_callable=python_task1.main,
             dag=dag)

我假设PythonOperator将使用系统Python环境。我发现Airflow有PythonVirtualenvOperator,但这似乎是通过使用指定的要求即时创建新的虚拟环境来工作的。我更喜欢使用已经正确配置的现有虚拟环境。

我该如何使用指定的Python路径运行PythonOperator?


2
PythonOperator只是运行可调用对象。您需要编写一个专门的操作器类,它是PythonOperator类的子类,以使用预定义的Python虚拟环境。 - Oluwafemi Sule
3个回答

14

我的解决方法是使用Bash操作符调用 /path/to/project/venv/bin/python my.py


11
首先要注意:通常情况下你不应该依赖预先存在的资源来创建自己的Operators。你的Operators应该是可移植的,因此使用长期存在的virtualenvs与此原则有些违背。话虽如此,这并不是什么大问题,就像你必须在全局环境中预先安装软件包一样,你可以预先准备一些环境。或者,你可以让Operator创建环境,后续的Operators可以重复使用它——我认为这是最简单、但也是最危险的方法。

实现"virtualenv缓存"应该不难。阅读PythonVirtualenvOperator的执行方法的实现:

def execute_callable(self):
    with TemporaryDirectory(prefix='venv') as tmp_dir:
        ...
        self._execute_in_subprocess(
            self._generate_python_cmd(tmp_dir,
                                      script_filename,
                                      input_filename,
                                      output_filename,
                                      string_args_filename))
        return self._read_result(output_filename)

看起来似乎它没有显式地删除虚拟环境(它依赖于 TemporaryDirectory 来完成此操作)。您可以子类化 PythonVirtualenvOperator 并仅使用自己的上下文管理器来重用临时目录:

import glob

@contextmanager
def ReusableTemporaryDirectory(prefix):
    try:
        existing = glob.glob('/tmp/' + prefix + '*')
        if len(existing):
            name = existing[0]
        else:
            name = mkdtemp(prefix=prefix)
        yield name
    finally:
        # simply don't delete the tmp dir
        pass

def execute_callable(self):
    with ReusableTemporaryDirectory(prefix='cached-venv') as tmp_dir:
        ...

当然,您可以在 ReusableTemporaryDirectory 中摆脱 try-finally ,并恢复常规的 suffix dir 参数,我进行了最小的更改,以便易于与原始的 TemporaryDirectory 类进行比较。

有了这个,您的virtualenv不会被丢弃,但是新的依赖项最终将由操作员安装。


0
使用PythonVirtualenvOperator。您需要提供要运行的Python函数和虚拟环境的requirements.txt。

1
目前你的回答不够清晰。请编辑并添加更多细节,以帮助其他人理解它如何回答所提出的问题。你可以在帮助中心找到有关如何撰写好答案的更多信息。 - Community

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接