Airflow DAG中的外部文件

27

我正在尝试在Airflow任务中访问外部文件以读取一些SQL,但是我收到了"文件未找到"的错误消息。有人遇到过这种情况吗?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)

日志记录如下:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'

我明白我可以简单地复制并粘贴查询到同一个文件中,但这实在不是一个整洁的解决方案。由于有多个查询且文本非常庞大,将其嵌入Python代码会影响可读性。

4个回答

24

这里有一个使用变量的示例,让它变得更容易。

  • 首先在Airflow UI -> Admin -> Variable中添加变量,例如:{key: 'sql_path', values: 'your_sql_script_folder'}

  • 然后在您的DAG中添加以下代码,即可使用Airflow中的变量。

DAG 代码:

import airflow
from airflow.models import Variable

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
   'tutorial',
    schedule_interval="@daily",
    template_searchpath=tmpl_search_path,  # this
    default_args=default_args
)
  • 现在您可以在文件夹变量下使用SQL脚本名称或路径。

  • 您可以在这里了解更多信息。


请问,您能提供一个完整的例子吗?定义template_searchpath,这会改变整个脚本的行为,现在我可以通过文件名引用文件了吗?例如,这样是否可以完成您的示例:with open(query_file_name, 'r') as file: query_content = file.read() - ricoms
3
我认为这种方法无法在OP使用的带有PythonOperator和Python本地open()的示例DAG中实现。PythonOperator在一个Pod中运行,该Pod无法访问与解析DAG的进程相同的位置集合。 - LondonRob
2
@RicardoMS 你好,当你想要定义自己的 airflow.models.Variable 时,最简单的方法是通过 Airflow UI,即 homepage -> Admin -> Variables 来创建新变量,例如:{'Key': 'RicardoMS_variable', 'Val': '/opt/specific/path'}。完成后,你可以使用示例代码通过 tmpl_search_path = Variable.get("RicardoMS_variable") 来加载你的变量,而不是直接使用 '/opt/specific/path' - zhongjiajie
@LondonRob 我正在经历你所指出的问题。$AIRFLOW_HOME 环境变量被设置为 /opt/***,即使我直接在文件路径中使用其值 /opt/airflow,它也会自动地更改为 /opt/***,导致文件未找到错误。 - user2268997
@user2268997 在任务执行期间访问磁盘上的文件比较棘手,因为你尝试执行的实际 Python 代码并不在与 Airflow 安装相同的机器上运行:它在自己的基础设施中运行。因此,如果你需要在任务运行时访问磁盘上的文件,你必须明确地计划这一点。也许在解析 DAG 定义文件时加载文件,然后将结果作为字符串传递给任务? - LondonRob
@lonndonrob,结果发现有一个打字错误,代码是正确的,但我已经将容器中的文件夹挂载到了我的本地机器上。 这是否巧合,也许因为它是在单台机器上进行本地安装,或者这是预期的行为? - user2268997

9
假设sql目录相对于当前Python文件,你可以像这样找出sql文件的绝对路径:
import os

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def run_query():
    # read the query
    query = open(f"{CUR_DIR}/sql/queryfile.sql")
    # run the query
    execute(query)

8

所有相关路径都是以AIRFLOW_HOME环境变量为参考的。请尝试:

  • 给出绝对路径
  • 将文件放在AIRFLOW_HOME相对路径下
  • 尝试在Python可调用函数中记录PWD,然后决定要给出哪个路径(最佳选项)

2
好的评论,但不幸的是AIRFLOW_HOME是一个可选的环境变量 - Airflow没有它也可以正常工作 - 而且你不能保证它会被设置。 - Kirk Broadhurst

0
您可以通过以下方式获取DAG目录。
conf.get('core', 'DAGS_FOLDER')

# open file
open(os.path.join(conf.get('core', 'DAGS_FOLDER'), 'something.json'), 'r')

参考:https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder

该页面介绍了Apache Airflow的配置文件中的“DAGs Folder”选项,这是存储具有DAG定义的Python文件的目录。可以在此目录中添加、删除或修改DAG文件,以启用Airflow调度这些任务。

conf.get('core', 'DAGS_FOLDER') NameError: name 'conf' is not defined - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接