Python 3.6版本的Airflow使用需要2.7版本Operator。

4

我目前正在使用Python 3.6.5运行airflow(1.9.0)实例。我有一个手动的工作流,我想将其移动到DAG中。这个手动的工作流现在需要用python 2和3编写的代码。我们将DAG简化为3个步骤:

  1. 处理数据并设置机器学习培训的数据流作业
  2. TensorFlow机器学习训练作业
  3. 使用Python 3代码编写的其他Python操作器

数据流作业是用Python 2.7编写的(由Google要求),TensorFlow模型代码是用Python 3编写的。在airflow 1.9.0中查看"MlEngineTrainingOperator",有一个python_version参数,它设置了“在训练中使用的Python版本”。

问题:

  • 我可以在工作环境中动态指定特定的Python版本吗?
  • 我是否必须仅在Python 2.7上安装airflow才能运行步骤1)?
  • 我可以在Python 2上运行的MlEngineTraining上提交Python 3中的TensorFlow模型代码吗?
  • 我是否必须将我的第3项操作器重写为Python 2?
2个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
4

在worker上动态指定Python版本的方法不存在。但是,如果您使用Celery executor,则可以在不同服务器/虚拟机上运行多个worker或在不同虚拟环境中运行它们。

您可以让一个worker运行Python 3,另一个运行2.7,并使每个worker监听不同的队列。有三种方法可以实现:

  • 在启动worker时添加-q [queue-name]标志
  • 设置AIRFLOW__CELERY__DEFAULT_QUEUE的环境变量
  • 更新airflow.cfg中[celery]下的default_queue

然后,在任务定义中指定queue参数,根据任务需要运行的Python版本更改队列。

我不熟悉MLEngineOperator,但是您可以在PythonOperator中指定python_version,这应该会在该版本的virtualenv中运行它。或者,您可以使用BashOperator将要运行的代码写入不同的文件,并使用绝对路径指定要使用的python命令来运行它。

无论如何运行任务,您只需要确保DAG本身与您正在运行它的Python版本兼容即可。例如,如果您要在不同的Python版本中启动airflow worker,则DAG文件本身需要兼容Python 2和3。DAG可以具有它使用的附加文件依赖项的版本不兼容性。


因此,MLEngineTrainingOperator实际上从不调用任何Python代码,只是使用Python模块的HTTP请求。但是,DataFlowPythonOperator继承自BaseOperator,并使用DataFlowHook,该钩子调用start_python_dataflow,该方法没有区分不同的Python版本。我认为使用不同的celery worker似乎是可行的方法。 - Andrew Cassidy
在工作节点上执行的操作员需要在该环境中满足它们的依赖关系。例如,如果使用HiveOperator,则需要在该计算机上安装hive CLI,或者如果使用MySqlOperator,则所需的Python库需要以某种方式在PYTHONPATH中可用...那么Airflow库是如何在Python 2和3上运行的?我是否需要处理一些在Python 2中具有不同代码的操作员? - Andrew Cassidy
我猜你的意思是我需要明确地为此做准备... 我只是想知道大多数或所有的Airflow操作器是否都可以在Python 2和3上运行... - Andrew Cassidy
在Python 2中运行我的DAG目前给了我以下错误信息:"File "/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1588, in process return loads(value) ValueError: unsupported pickle protocol: 4"。 - Andrew Cassidy
大多数运算符将使用2或3个。我建议在Python 3下运行airflow。然后,您可以使用PythonVirtualenvOperator(https://github.com/apache/incubator-airflow/blob/e9babff4eb3334b0d71cda31c2f6cbfe7b741389/airflow/operators/python_operator.py#L200) 或设置您的工作程序安装Python 2但不在您的路径中,然后使用BashOperator指定python 2的绝对路径(即/path/to/python2 /path/to/your_code_that_requires_py2)以在Py2中执行脚本。 - cwurtz

1

首先,你不能在通用的 Python 3 Airflow 集群上运行 Python 2 Airflow 工作程序:

Airflow 使用 SQLAlchemy(我认为是为了读写有关 DAG 的元数据到数据库中)。当你在工作程序上运行 DAG 时,它将从数据库中读取有关该 DAG 的 pickled 信息。如果你的其他非工作程序组件使用的是 Python 3,则它们将以 pickle 4 的形式写入数据库,而工作程序将尝试以 Python 2 的形式从数据库中读取。

特别地,在 SQLAlchemy 中查看 sqltypes.py:

class PickleType(TypeDecorator):
    """Holds Python objects, which are serialized using pickle.

    PickleType builds upon the Binary type to apply Python's
    ``pickle.dumps()`` to incoming objects, and ``pickle.loads()`` on
    the way out, allowing any pickleable Python object to be stored as
    a serialized binary field.

    To allow ORM change events to propagate for elements associated
    with :class:`.PickleType`, see :ref:`mutable_toplevel`.

    """

    impl = LargeBinary

    def __init__(self, protocol=pickle.HIGHEST_PROTOCOL,
                 pickler=None, comparator=None): 

然后在compat.py中进行最终的序列化,这个过程最终会在sqltypes.py中完成。

py36 = sys.version_info >= (3, 6)
py33 = sys.version_info >= (3, 3)
py35 = sys.version_info >= (3, 5)
py32 = sys.version_info >= (3, 2)
py3k = sys.version_info >= (3, 0)
py2k = sys.version_info < (3, 0)
py265 = sys.version_info >= (2, 6, 5)
jython = sys.platform.startswith('java')
pypy = hasattr(sys, 'pypy_version_info')
win32 = sys.platform.startswith('win')
cpython = not pypy and not jython  # TODO: something better for this ?

import collections
next = next

if py3k:
    import pickle
else:
    try:
        import cPickle as pickle
    except ImportError:
        import pickle
此外,airflow 中的 donot_pickle = True 似乎对此没有影响???也许是因为根据 这里 的说明,它只在回填中相关?

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,