如何在Python虚拟环境中使用Airflow Docker ExternalPythonOperator?

3

情况

  • 自2022年9月19日发布Apache Airflow 2.4.0以来
  • Airflow支持ExternalPythonOperator
  • 我已经向主要贡献者提出了请求,并且应该能够将2个python虚拟环境添加到Airflow Docker 2.4.1的基本映像中,并能够在DAG内运行单个任务。

目标

  • 我的目标是使用从本地requirements.txt构建的多个主机python虚拟环境。
  • 使用ExternalPythonOperator运行它们
  • 我的每个DAG都只执行定时python函数

我想请求

  • 示例文件如何创建一个独立的有意识存在的python虚拟环境,通过基本的docker Airflow 2.4.1映像和:
    • docker-compose.yml #最佳选项,因此我只需要在官方映像上使用docker-compose
    • Dockerfile #第二好的选择,但因为我需要使用docker-compose.yml文件对官方映像进行一些处理,所以需要进行docker组合

系统

  • 2.4.1可用的Docker映像。
  • ubuntu 20.04 LTS

知识空白

我不想要这个

  • 使用PythonVirtualenvOperator动态创建venvs。(已成功执行此操作,但我的DAG太轻或导入太多,因此不适合使用)
  • 我有1个Python函数/DAG,因此我不需要这个 ->“请注意,虚拟环境是针对任务而不是DAG的。您目前无法解析DAG并在不同的虚拟环境中执行整个DAG - 您可以在其中执行单个Python *任务。将为“整个DAG”实现单独的运行时环境可能会在2.4或2.6中实现,作为https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-46+Runtime+isolation+for+airflow+tasks+and+dag+parsing"的结果。

终端命令

docker build -t my-image-apache/airflow:2.4.1 .

之后我会运行以下命令,但第一步失败了。
docker-compose up

我的文件

docker-compose.yml

https://airflow.apache.org/docs/apache-airflow/2.4.1/docker-compose.yaml

这是一个关于IT技术的文件,其中包括了docker-compose.yml文件的链接。请点击链接以获取更多信息。
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:2.4.1
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-my-image-apache/airflow:2.4.1}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    # For backward compatibility, with Airflow <2.3
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - .:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
  # or by explicitly targeted on the command line e.g. docker-compose up flower.
  # See: https://docs.docker.com/compose/profiles/
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

volumes:
  postgres-db-volume:

Dockerfile(我混乱的整理)

FROM apache/airflow:2.4.1-python3.8

# https://pythonspeed.com/articles/activate-virtualenv-dockerfile/
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Install dependencies:
COPY requirements.txt .
RUN pip install -r requirements.txt

# Run the application:
# COPY myapp.py .
# CMD ["python", "myapp.py"]


# RUN python3 -m venv /path/to/new/virtual/environment_1 && \
#     /path/to/new/virtual/environment_1/bin/python \
#     -m pip install requirements.txt
# RUN python3 -m venv /path/to/new/virtual/environment_2 && \
#     /path/to/new/virtual/environment_2/bin/python \
#     -m pip install my_requirements_2.txt

错误

我之前在Dockerfile中有过Python环境,例如:

FROM python:3.9-slim-bullseye

RUN python3 -m venv /opt/venv

# Install dependencies:
COPY requirements.txt .
RUN . /opt/venv/bin/activate && pip install -r requirements.txt

# Run the application:
COPY myapp.py .
CMD . /opt/venv/bin/activate && exec python myapp.py

Dockerfile:但是在Airflow中它就无法正常工作。
FROM apache/airflow:2.4.1-python3.8
COPY requirements.txt .
RUN python3 -m venv /opt/airflow/virtual_1 && \
/opt/airflow/virtual_1/bin/python \
-m pip install requirements.txt

错误

 => ERROR [stage-1 2/2] RUN python3 -m venv /opt/airflow/virtual_1 && /opt/airflow/virtual_1/bin/python -m pip install requirements.txt 

我尝试过的其他事情

1.)

FROM apache/airflow:2.4.1-python3.8
RUN python3 -m venv /opt/airflow
# Install dependencies:
COPY requirements.txt .
RUN /opt/airflow/venv/bin/pip install -r requirements.txt

命令 - docker build -t my-image-apache/airflow:2.4.1 .

错误 => 错误 [4/4] 运行 /opt/airflow/venv/bin/pip install -r requirements.txt

2.)

FROM apache/airflow:2.4.1-python3.8
COPY requirements.txt .
RUN python3 -m venv && \
    /venv/bin/python install -m pip requirements.txt

错误 => 错误 [3/3] 运行 python3 -m venv && /venv/bin/python install -m pip requirements.txt

1个回答

0

比Airflow更简单的替代方案

如果您对此并没有太多投入,我宁愿不推荐使用Airflow,因为有易于使用的替代方案:

  1. Mage ai - https://github.com/mage-ai/mage-ai
  2. jupyter调度器 - https://www.google.com/search?client=firefox-b-d&q=jupyter+schduler
  3. !!已付费 - https://docs.qubole.com/en/latest/user-guide/notebooks-and-dashboards/notebooks/jupyter-notebooks/scheduling-jupy-notebooks.html
  4. jupyterlab-scheduler 0.1.5 - https://pypi.org/project/jupyterlab-scheduler/
  5. https://pypi.org/project/notebooker/
  6. notebooker 0.4.4 - https://pypi.org/project/notebooker/
  7. papermill - https://pypi.org/project/papermill/

如何使用Airflow进行操作

1.) 原始Dockerfile

[可更改的] 这是原始镜像,您可以从中获取 - https://hub.docker.com/r/apache/airflow/dockerfile

2.) 原始镜像

[可更改的] 是从原始 Dockerfile 创建的编译后的镜像 - https://hub.docker.com/layers/apache/airflow/latest/images/sha256-5015db92023bebb1e8518767bfa2e465b2f52270aca6a9cdef85d5d3e216d015?context=explore

3.) 我的requirements.txt

requirements.txt - 不必在其中安装Airflow。

pandas==1.3.0
numpy==1.20.3

3.) 我的 Dockerfile

这个文件会拉取原始镜像并进行扩展

FROM apache/airflow:2.4.1-python3.8

# Compulsory to switch parameter
ENV PIP_USER=false

#python venv setup
RUN python3 -m venv /opt/airflow/venv1

# Install dependencies:
COPY requirements.txt .

# --user   <--- WRONG, this is what ENV PIP_USER=false turns off
#RUN /opt/airflow/venv1/bin/pip install --user -r requirements.txt  <---this is all wrong
RUN /opt/airflow/venv1/bin/pip install -r requirements.txt
RUN /opt/airflow/venv1/bin/pip install 'apache-airflow==2.4.1' --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.4.1/constraints-3.8.txt"

ENV PIP_USER=true

4.) 终端命令

(必须在与您的文件相同的库中,文件名必须为“Dockerfile”)

docker build -t my-image-apache/airflow:2.4.1 .

5.) DAG 文件

mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
  • 如果您已经生成了".env"文件,您可以在其中设置新的用户名和密码,以供主用户使用。

6.) ex test DAG

  • !!! dag_id,task_id - 必须唯一!!
  • !! 如果您将Dag文件拖入本地dags文件夹中,则自动添加运行Web服务器,只需5-10分钟,如果您修改现有文件,则Web服务器几乎立即刷新。
  • !#python = os.fspath(sys.executable)-->' / opt / airflow / venv1 / bin / python3' <--必须指向Python虚拟环境中的可执行Python文件
"""
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
"""
from __future__ import annotations

import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)

PYTHON = sys.executable

BASE_DIR = tempfile.gettempdir()

with DAG(
    dag_id='test_external_python_venv_dag2',
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['my_test'],
) as dag:
    #@task.external_python(task_id="test_external_python_venv_task", python=os.fspath(sys.executable))
    # /opt/airflow/venv1/bin/python3  <-- have to point to an executable python file in thy python virtual environemnt
    @task.external_python(task_id="test_external_python_venv_task", python='/opt/airflow/venv1/bin/python3')
    def test_external_python_venv_def():
        """
        Example function that will be performed in a virtual environment.
        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        import sys
        from time import sleep
        ########## MY CODE ##########
        import numpy as np
        import pandas as pd
        d = {'col1': [1, 2], 'col2': [3, 4]}
        df = pd.DataFrame(data=d)
        print(df)
        a = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
        print(a)
        #a= 10
        return a
        ########## XXXXX MY CODE XXXXX ##########

        print(f"Running task via {sys.executable}")
        print("Sleeping")
        for _ in range(4):
            print('Please wait...', flush=True)
            sleep(1)
        print('Finished')

    external_python_task = test_external_python_venv_def()

7.) docker-compose.yml

官方原始的docker-compose.yml文件 https://airflow.apache.org/docs/apache-airflow/2.4.1/docker-compose.yaml,修改此部分:

## Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-my-image-apache/airflow:2.4.1} #<- this is because of my terminal command above section
#  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.1} <--- THIS WAS THE ORIGINAL
  environment:
    #.... many staff here originaly in this environment section.....
    AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true' # <--ADD THIS. This is internal communication for airflow
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs

9.) 构建容器镜像

(必须在与您的文件相同的库中,文件名必须为“docker-compose.yml”)

docker-compose up

或通过以下命令从终端分离:

docker-compose up -d

10.) 日志

如果您想在Mac和Windows上查看容器的日志,则Docker APP GUI可以让您这样做。在Linux上,您可以使用以下命令:

docker logs -f CONTATINER_ACTUAL_ID

您可以按下以下快捷键退出而不关闭容器:

CTRL + c

11.) 关闭容器:

  • 正常方式 docker-compose down
  • 或者如果您在日志中,请按CTRL + C

!!! 停止并删除容器,删除数据库数据卷并下载镜像,运行。

docker-compose down --volumes --rmi all


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接