我正在尝试在Airflow任务中访问外部文件以读取一些SQL,但是我收到了"文件未找到"的错误消息。有人遇到过这种情况吗?
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
dag = DAG(
'my_dat',
start_date=datetime(2017, 1, 1),
catchup=False,
schedule_interval=timedelta(days=1)
)
def run_query():
# read the query
query = open('sql/queryfile.sql')
# run the query
execute(query)
tas = PythonOperator(
task_id='run_query', dag=dag, python_callable=run_query)
日志记录如下:
IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'
我明白我可以简单地复制并粘贴查询到同一个文件中,但这实在不是一个整洁的解决方案。由于有多个查询且文本非常庞大,将其嵌入Python代码会影响可读性。
template_searchpath
,这会改变整个脚本的行为,现在我可以通过文件名引用文件了吗?例如,这样是否可以完成您的示例:with open(query_file_name, 'r') as file: query_content = file.read()
? - ricomsPythonOperator
和Python本地open()
的示例DAG中实现。PythonOperator
在一个Pod中运行,该Pod无法访问与解析DAG的进程相同的位置集合。 - LondonRobairflow.models.Variable
时,最简单的方法是通过 Airflow UI,即homepage -> Admin -> Variables
来创建新变量,例如:{'Key': 'RicardoMS_variable', 'Val': '/opt/specific/path'}
。完成后,你可以使用示例代码通过tmpl_search_path = Variable.get("RicardoMS_variable")
来加载你的变量,而不是直接使用'/opt/specific/path'
。 - zhongjiajie$AIRFLOW_HOME
环境变量被设置为/opt/***
,即使我直接在文件路径中使用其值/opt/airflow
,它也会自动地更改为/opt/***
,导致文件未找到错误。 - user2268997