Sql Alchemy 队列池限制溢出。

60

我有一个Sql Alchemy应用程序,它返回了TimeOut:

TimeoutError: 队列池大小限制为5,溢出10已达到,连接超时,超时时间30

我在不同的帖子中阅读到,当我没有关闭会话时,就会发生这种情况,但我不知道是否适用于我的代码:

我在init.py中连接到数据库:

from .dbmodels import (
    DBSession,
    Base,    

engine = create_engine("mysql://" + loadConfigVar("user") + ":" + loadConfigVar("password") + "@" + loadConfigVar("host") + "/" + loadConfigVar("schema"))

#Sets the engine to the session and the Base model class
DBSession.configure(bind=engine)
Base.metadata.bind = engine

然后在另一个 Python 文件中,我使用在 init.py 中初始化的 DBSession 在两个函数中收集了一些数据:

from .dbmodels import DBSession
from .dbmodels import resourcestatsModel

def getFeaturedGroups(max = 1):

    try:
        #Get the number of download per resource
        transaction.commit()
        rescount = DBSession.connection().execute("select resource_id,count(resource_id) as total FROM resourcestats")

        #Move the data to an array
        resources = []
        data = {}
        for row in rescount:
            data["resource_id"] = row.resource_id
            data["total"] = row.total
            resources.append(data)

        #Get the list of groups
        group_list = toolkit.get_action('group_list')({}, {})
        for group in group_list:
            #Get the details of each group
            group_info = toolkit.get_action('group_show')({}, {'id': group})
            #Count the features of the group
            addFesturedCount(resources,group,group_info)

        #Order the FeaturedGroups by total
        FeaturedGroups.sort(key=lambda x: x["total"],reverse=True)

        print FeaturedGroups
        #Move the data of the group to the result array.
        result = []
        count = 0
        for group in FeaturedGroups:
            group_info = toolkit.get_action('group_show')({}, {'id': group["group_id"]})
            result.append(group_info)
            count = count +1
            if count == max:
                break

        return result
    except:
        return []


    def getResourceStats(resourceID):
        transaction.commit()
        return  DBSession.query(resourcestatsModel).filter_by(resource_id = resourceID).count()

会话变量是这样创建的:

#Basic SQLAlchemy types
from sqlalchemy import (
    Column,
    Text,
    DateTime,
    Integer,
    ForeignKey
    )
# Use SQLAlchemy declarative type
from sqlalchemy.ext.declarative import declarative_base

#
from sqlalchemy.orm import (
    scoped_session,
    sessionmaker,
    )

#Use Zope' sqlalchemy  transaction manager
from zope.sqlalchemy import ZopeTransactionExtension

#Main plugin session
DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))

由于会话(session)是在init.py中创建的,而在后续代码中我只是使用它;那么我需要在什么时候关闭会话呢?或者说我还需要做什么来管理池的大小呢?


在第二段代码片段中,transaction 定义在哪里? - Tom Dalton
getResourceStats 函数在哪里被使用了?看起来代码的一半都不见了,你能否把它全部添加进去,否则可能无法诊断问题。 - Tom Dalton
1
这段代码能直接运行吗?import语句看起来非常奇怪,除了@TomDalton提到的问题之外。错误是在什么时候发生的?这是最小完整可验证示例吗?也就是说,您能否将其减少(或显著减少)并仍然展示相同的行为?似乎有很多业务逻辑在其中,这很不可能是问题的原因。尝试删除它,并发布一个完整的工作(语法正确)的示例,我很乐意帮助。 - cod3monk3y
3个回答

75

您可以通过在函数create_engine中添加参数pool_sizemax_overflow来管理连接池的大小。

engine = create_engine("mysql://" + loadConfigVar("user") + ":" + loadConfigVar("password") + "@" + loadConfigVar("host") + "/" + loadConfigVar("schema"), 
                        pool_size=20, max_overflow=0)

参考文档在这里

您不需要关闭会话,但在事务完成后应关闭连接。 请替换为:

rescount = DBSession.connection().execute("select resource_id,count(resource_id) as total FROM resourcestats")

作者:

connection = DBSession.connection()
try:
    rescount = connection.execute("select resource_id,count(resource_id) as total FROM resourcestats")
    #do something
finally:
    connection.close()

参考资料在这里

此外,注意MySQL的连接如果长时间不用会被关闭(这个时间段可以在MySQL中配置,默认值我不记得了),所以您需要在引擎创建时传递pool_recycle值。


1
MySQL中非活动会话的默认超时时间为8小时,适用于交互式和非交互式连接: [1] https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_wait_timeout [2] https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_interactive_timeout - Ludecan
你提到我们应该关闭连接而不是会话。但是 fastAPI 的示例说我们应该关闭会话。https://fastapi.tiangolo.com/tutorial/sql-databases/#create-a-dependency - Rahul Kumar
关闭会话并不会关闭连接,但它可以防止错误。如果导入contextlib.closing,那么代码只需要写成with closing(Session()) as session: # do stuff - Peter

7

在你的代码中添加以下方法。它将自动关闭所有未使用/挂起的连接并防止代码瓶颈。特别是如果您正在使用以下语法 Model.query.filter_by(attribute=var).first() 和关系/惰性加载。

在你的代码中加入以下方法。它会自动关闭所有未使用或者挂起的连接,避免代码瓶颈。特别是当你使用以下语法Model.query.filter_by(attribute=var).first()以及关系/惰性加载时。
   @app.teardown_appcontext
    def shutdown_session(exception=None):
        db.session.remove()

这里提供有关此文档的信息:http://flask.pocoo.org/docs/1.0/appcontext/


请提供要翻译的英文内容。 - Callam Delaney
我已经在我们的应用程序中尝试过这个方法,但对我们个人来说并没有解决问题。 - nikk wong
1
如果您使用Flask-SQLAlchemy模块,这已经配置好了,请在此处进行检查。 - Georgios Syngouroglou

1
rescount = DBSession.connection().execute()

rescount<class 'sqlalchemy.engine.cursor.CursorResult'> 类型。

你应该调用 close() 函数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接