Airflow调度器通过WSL无法执行Windows EXE

3

我的Windows 10机器上安装了Airflow 1.10.11,位于WSL 2(Ubuntu-20.04)内部。

我有一个BashOperator任务,它通过/mnt/c/...或符号链接在Windows上调用.EXE。 但是该任务失败了,日志显示:

[2020-12-16 18:34:11,833] {bash_operator.py:134} INFO - Temporary script location: /tmp/airflowtmp2gz6d79p/download.legacyFilesnihvszli
[2020-12-16 18:34:11,833] {bash_operator.py:146} INFO - Running command: /mnt/c/Windows/py.exe
[2020-12-16 18:34:11,836] {bash_operator.py:153} INFO - Output:
[2020-12-16 18:34:11,840] {bash_operator.py:159} INFO - Command exited with return code 1
[2020-12-16 18:34:11,843] {taskinstance.py:1150} ERROR - Bash command failed
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.8/dist-packages/airflow/operators/bash_operator.py", line 165, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2020-12-16 18:34:11,844] {taskinstance.py:1187} INFO - Marking task as FAILED. dag_id=test-dag, task_id=download.files, execution_date=20201216T043701, start_date=20201216T073411, end_date=20201216T073411

就是这样了。返回代码为1,没有更多有用的信息。
通过bash运行相同的EXE文件可以完美运行,没有错误(我也尝试了我的程序,它会向控制台发出一些信息 - 在bash中它可以正常输出,但是通过airflow scheduler执行时会出现同样的错误1)。
更多的数据和我排除其他问题所做的事情:
  • airflow scheduler以root身份运行。我还确认它在BashOperator中运行时处于root上下文中,因为我在其中放置了一个whoami命令,在BashOperator中确实输出了root(我还应该注意到,所有原生的Linux程序都可以正常运行!只有Windows程序不行。)
  • 我正在尝试执行的Windows EXE及其目录具有完整的“Everyone”权限(当然,我不敢在Windows文件夹上这样做 - 这只是一个例子)。
  • 无论是通过/mnt/c访问还是通过符号链接访问,失败都会发生。在符号链接的情况下,符号链接具有777权限。
  • 我尝试在BashOperator任务上运行airflow test - 它可以完美运行 - 向控制台输出并返回0(成功)。
  • 尝试使用各种EXE文件 - 包括“本地”(例如随Windows安装的文件)以及我制作的C#程序。在所有情况下都有相同的行为。
  • 在Airflow的GitHub存储库中没有找到任何类似的问题文档,也没有在Stack Overflow中找到任何类似的问题。
问题是:Airflow的Python进程启动方式(即airflow scheduler使用运行Bash Operators)与“正常”的Bash方式有何不同,导致error 1
1个回答

3
你可以使用Python和PowerShell的subprocess和sys库。
在Airflow > Dags文件夹中,创建2个文件:main.py和caller.py。
因此,main.py调用caller.py,而caller.py在机器(Windows)上运行文件或例程。
这是整个过程:

enter image description here

代码 Main.py:

# Importing the libraries we are going to use in this example
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator


# Defining some basic arguments
default_args = {
   'owner': 'your_name_here',
   'depends_on_past': False,
   'start_date': datetime(2019, 1, 1),
   'retries': 0,
   }


# Naming the DAG and defining when it will run (you can also use arguments in Crontab if you want the DAG to run for example every day at 8 am)
with DAG(
       'Main',
       schedule_interval=timedelta(minutes=1),
       catchup=False,
       default_args=default_args
       ) as dag:

# Defining the tasks that the DAG will perform, in this case the execution of two Python programs, calling their execution by bash commands
    t1 = BashOperator(
       task_id='caller',
       bash_command="""
       cd /home/[Your_Users_Name]/airflow/dags/
       python3 Caller.py
       """)

    # copy t1, paste, rename t1 to t2 and call file.py
    
# Defining the execution pattern
    t1

    # comment: t1 execute and call t2
    # t1 >> t2

代码调用者.py

import subprocess, sys

p = subprocess.Popen(["powershell.exe"
                     ,"cd C:\\Users\\[Your_Users_Name]\\Desktop; python file.py"] # file .py
                    #,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.html"]    # file .html
                    #,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.bat"]     # file .bat
                    #,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.exe"]     # file .exe
                    , stdout=sys.stdout
                     )

p.communicate()

如何知道你的代码在Airflow中是否能正常运行,如果运行,那就没问题了。

enter image description here


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接