我写了一个Python应用程序,使用SQLAlchemy连接到Postgres数据库。在
我想知道是否有一种方法可以在
db.py
中定义了engine
和session
。engine = create_engine(URL(**settings.DATABASE))
session = scoped_session(sessionmaker(bind=engine))
大多数数据库操作都在service.py
中,该文件从db.py
中导入session
。
from app.db import engine, session
def get_door_ids():
result = session.query(ControllerDetail.door_id).distinct().all()
ids = [c[0] for c in result]
return ids
def get_last_health_cd(door_id):
result = session.query(ControllerDetail.door_health_cd).filter(ControllerDetail.door_id == door_id).order_by(ControllerDetail.etl_insert_ts.desc()).first()
return result[0]
现在一切都很好,但问题是我需要每隔几分钟重复执行相同的操作。因此,我在我的主模块中编写了以下代码:
try:
while True:
run_task()
time.sleep(120)
except KeyboardInterrupt:
print('Manual break by user')
数据库每隔一分钟就会超时空闲连接。因此,每次进程休眠超过1分钟时,我都会收到错误提示。
psycopg2.InternalError: terminating connection due to idle-in-transaction timeout
SSL connection has been closed unexpectedly
我想知道是否有一种方法可以在
time.sleep(120)
后关闭会话并重新打开它,以避免超时。也许在主模块中,我可以将 session
从 db
中作为全局变量导入,并以某种方式将其传递给 services
中的方法。我该怎么做?由于 main
导入了来自 services
的函数,因此我无法从 main
而不是 db
中导入 services
中的 session
。
run_task()
之前创建了scoped_session
,并在之后关闭了它。现在session
是run_task(session)
的参数。因此,所有其他函数也将以session
作为它们的参数。不确定将session
作为这些查询函数的参数是否是常见做法,但它可以正常工作。 - dddscoped_session
将会话的生命周期与工作线程的生命周期绑定。当您说在run_task()
之后关闭了作用域会话时,循环调用run_task()
并且它实际上在同一线程中运行吗?如果不是,则创建了一个会话,而工作线程则创建了另一个会话。 - Ilja Everilärun_task
的作用就是查询并发送电子邮件。 - dddscoped_session
。只需要从sessionmaker()
获取一个普通的Session
实例就可以了,将其传递是一个不错的模式;至少它使得测试更容易,并且促进了可重用性。 - Ilja Everilä