通过Airflow Docker容器执行Windows Python脚本

3
我正在Windows上工作,通过Docker设置了Airflow。我在Windows上有许多Python脚本,可以从多个位置(如SSH连接、Windows文件夹等)读取和写入。复制所有这些输入到我的Docker镜像中需要大量工作,因此我希望让Airflow执行这些脚本,就好像它们在Windows中运行一样。
如果可能的话,该怎么做呢?
以下是我作为DAG运行的脚本:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# Following are defaults which can be overridden later on
default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2018, 11, 27),
    'email': ['test@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('Helloworld', default_args=default_args)

###########################################################
# Here's where I want to execute my windows python script #
###########################################################
t1=PythonOperator(dag=dag,
               task_id='my_task_powered_by_python',
               provide_context=False,
               python_callable=r"C:\Users\user\Documents\script.py")

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Hello World from Task 2"',
    dag=dag)

t3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Hello World from Task 3"',
    dag=dag)

t4 = BashOperator(
    task_id='task_4',
    bash_command='echo "Hello World from Task 4"',
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t2)
t4.set_upstream(t3)
1个回答

2
您将Python脚本路径传递给了PythonOperator,PythonOperator正在寻找Python代码对象,而不是脚本文件路径。
您有两种不同的选择来调用这些Python脚本。 通过Bash和BashOperator直接调用脚本 您可以像上面一样使用BashOperator直接调用Python脚本。
您可以通过在BashOperator中使用以下命令来实现调用Python脚本,就像您没有使用Airflow一样。
python script.py

移动脚本并使用PythonOperator

如果这些脚本仅从Airflow调用,我建议将它们移动到您的Python代码库中,并根据需要调用其中的入口函数。

例如:

airflowHome/
  dags/
  plugins/
  scripts/
    __init__.py
    script1.py
    script2.py

现在您可以通过Python导入在脚本模块中的脚本。从那里,您可以使用PythonOperator在DAG内调用特定函数。

感谢@andscoop的回答。我现在已经修改了我的代码,所以Python正确地从scripts文件夹中被调用。t1 = BashOperator( task_id='task_1', bash_command='python /usr/local/airflow/scripts/script1.py', dag=dag )然而,我仍然遇到相同的问题,即我的Python脚本被作为Docker中的Bash进程运行,而不是作为Windows进程运行。 - undefined
我在第一次阅读时没有注意到那个具体的问题 - 我认为你所问的是不可能的,因为Docker容器的目的就是将进程限制在其中。 - undefined
啊,真可惜。我对人们如何在Docker中使用Airflow很感兴趣。对我来说,如果所有的东西都需要在Docker内处理,我觉得直接安装完整版的Ubuntu并在其中开发一切会更容易。如果人们在Docker内开发复杂的ETL流水线,我会感到惊讶。 - undefined
为什么你不能在Docker中进行开发呢?我已经在Docker中构建了几个ETL流水线。我还见过一些利用Docker内的Airflow和单独的pyspark容器来处理一些更复杂操作符的流水线。如果你想进一步讨论,请通过我的个人资料中的链接与我联系。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接