我们的DAGs需要一个自定义的存储库来帮助与数据库服务器通信。在该存储库中,我们使用pathlib获取配置文件的路径并传递给configparser。
基本上像这样:
import configparser
from pathlib import Path
config = configparser.ConfigParser()
p = Path(__file__)
p = p.parent
config_file_name = 'comms.conf'
config.read(p.joinpath('config', config_file_name))
这是Airflow中我所有的DAG都出现以下错误:
Broken DAG: [/opt/airflow/dags/example_folder/example_dag.py] 'PosixPath' object is not iterable
在命令行上的错误消息为:[2021-01-11 19:53:13,868] {dagbag.py:259} ERROR - Failed to import: /opt/airflow/dags/example_folder/example_dag.py
Traceback (most recent call last):
File "/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/airflow/models/dagbag.py", line 256, in process_file
m = imp.load_source(mod_name, filepath)
File "/opt/anaconda3/envs/airflow/lib/python3.7/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/example_folder/example_dag.py", line 8, in <module>
dag = Dag()
File "/opt/airflow/dags/util/dag_base.py", line 27, in __init__
self.comms = get_comms(Variable.get('environment'))
File "/opt/airflow/repository/repo_folder/custom_script.py", line 56, in get_comms
config = get_config('comms.conf')
File "/opt/airflow/repository/repo_folder/custom_script.py", line 39, in get_config
config.read(p.joinpath('config', config_file_name))
File "/opt/anaconda3/envs/airflow/lib/python3.7/site-packages/backports/configparser/__init__.py", line 702, in read
for filename in filenames:
TypeError: 'PosixPath' object is not iterable
我能够在docker容器之外复制这种行为,所以我不认为这与此有任何关系。这必须是airflow作为systemd服务运行和通过cli运行之间的区别?
这是我的有效的airflow服务文件:
[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service
[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/opt/anaconda3/envs/airflow/bin/airflow webserver
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
这是我在服务文件中使用的Airflow环境文件。请注意,在命令行界面中,我需要本地导出这些环境变量才能使Airflow运行到此处。还要注意,自定义仓库位于/opt/airflow目录中。
AIRFLOW_CONFIG=/opt/airflow/airflow.cfg
AIRFLOW_HOME=/opt/airflow
PATH=/bin:/opt/anaconda3/envs/airflow/bin:/opt/airflow/etl:/opt/airflow:$PATH
PYTHONPATH=/opt/airflow/etl:/opt/airflow:$PYTHONPATH
我的Airflow配置是默认的,除了以下更改:
executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@192.168.x.x:5432/airflow
load_examples = False
logging_level = WARN
broker_url = amqp://guest:guest@127.0.0.1:5672/
result_backend = db+postgresql://airflow:airflow@192.168.x.x:5432/airflow
catchup_by_default = False
configparser==3.5.3
我的conda环境使用的是Python 3.7版本,Airflow版本为1.10.14。它在一台Centos7服务器上运行。如果有人有任何能够帮助的想法,我将非常感激!
编辑:如果我将config.read(p.joinpath('config',config_file_name))
这行代码更改为直接指向配置文件,例如config.read('/opt/airflow/repository/repo_folder/config/comms.conf')
,它可以正常工作。所以这似乎与configparser如何处理pathlib输出有关?但是如果通过systemd服务运行Airflow,它就没有问题?
编辑2:我也可以用str()函数包装pathlib对象并使其正常工作。config.read(str(p.joinpath('config',config_file_name)))
我只是想知道为什么这个对于systemd服务来说没问题...我担心其他东西会被破坏?
backports.configparser
(该版本已经有了本地的configparser
标准库模块),这很可疑--我怀疑某些东西不正确地设置了PYTHONPATH
或改变了sys.path
,以便将site-packages
放在标准库之前。 - anthony sottileconda env create -f airflow.yml python=3.7
但是出现了一些问题,当我激活该环境并运行airflow initdb
时,它会给我一堆ModuleNotFoundErrors,尽管我可以从该环境启动python并导入它所缺少的所有内容。为了使其正常工作,我将环境中的site-packages目录添加到PYTHONPATH中。你知道为什么airflow无法看到安装在同一环境中的软件包吗? - wymangrPYTHONPATH
?which airflow
会给你什么结果?你的设置和单元测试完全一样吗?我建议找出不同之处,然后消除这些差异。 - anthony sottile