SQLAlchemy使用连接池时未关闭数据库连接

4
我发现 SQLAlchemy 在我的情况下不会释放数据库连接,因此这些连接会积累到可能导致服务器崩溃的程度。这些连接来自不同的线程。
以下是简化后的代码:
"""
Test to see DB connection allocation size while making call from multiple threads
"""

from time import sleep
from threading import Thread, current_thread
import uuid

from sqlalchemy import func, or_, desc
from sqlalchemy import event
from sqlalchemy import ForeignKey, Column, Integer, String, DateTime, UniqueConstraint
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import relationship
from sqlalchemy.orm import scoped_session, Session
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import Integer, DateTime, String, Boolean, Text, Float
from sqlalchemy.engine import Engine
from sqlalchemy.pool import NullPool

# MySQL
SQLALCHEMY_DATABASE = 'mysql'
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://amalgam:amalgam@localhost/amalgam?charset=utf8mb4' # https://dev59.com/06bja4cB1Zd3GeqPoPLZ
SQLALCHEMY_ECHO = False
SQLALCHEMY_ENGINE_OPTIONS = {'pool_size': 40, 'max_overflow': 0}
SQLALCHEMY_ISOLATION_LEVEL = "AUTOCOMMIT"

# DB Engine

# engine = create_engine(SQLALCHEMY_DATABASE_URI, echo=SQLALCHEMY_ECHO, pool_recycle=3600,
#                        isolation_level= SQLALCHEMY_ISOLATION_LEVEL,
#                        **SQLALCHEMY_ENGINE_OPTIONS
#                        ) #  Connect to server

engine = create_engine(SQLALCHEMY_DATABASE_URI, 
                        echo=SQLALCHEMY_ECHO, 
                        # poolclass=NullPool,
                        pool_recycle=3600,
                       isolation_level= SQLALCHEMY_ISOLATION_LEVEL,
                       **SQLALCHEMY_ENGINE_OPTIONS
                       ) #  Connect to server


session_factory = sessionmaker(bind=engine)
Base = declarative_base()

# ORM Entity
class User(Base):

    LEVEL_NORMAL = 'normal'
    LEVEL_ADMIN = 'admin'

    __tablename__ = "users"
    id = Column(Integer, primary_key=True)    
    name = Column(String(100), nullable=True)
    email = Column(String(100), nullable=True, unique=True)
    password = Column(String(100), nullable=True)
    level = Column(String(100), default=LEVEL_NORMAL)


# Workers
NO = 10
workers = []

_scoped_session_factory = scoped_session(session_factory)


def job(job_id):
    session = _scoped_session_factory()

    print("Job is {}".format(job_id))

    user = User(name='User {} {}'.format(job_id, uuid.uuid4()), email='who cares {} {}'.format(job_id, uuid.uuid4()))

    session.add(user)
    session.commit()
    session.close()

    print("Job {} done".format(job_id))
    sleep(10)
    
# Create worker threads
for i in range(NO):
    workers.append(Thread(target=job, kwargs={'job_id':i}))

# Start them
for worker in workers:
    worker.start()

# Join them
for worker in workers:
    worker.join()

# Allow some time to see MySQL's "show processlist;" command
sleep(10)


程序达到以下瞬间:
sleep(10)

我负责运营

show processlist;

这个结果表明所有与数据库的连接仍然存在。

enter image description here

我该如何强制关闭这些连接?

注意:我可以使用

poolclass=NullPool

但我觉得那种解决方案太过严格了 - 我希望仍然能够访问数据库池,但可以在需要时关闭连接。

1个回答

5
以下内容来自于QueuePool构造函数的签名 池大小 - 要维护的池的大小,默认为5。这是将保持在池中的最大连接数。请注意,池开始时没有连接;一旦请求了这些连接数,那么这些连接数将保留。可以将pool_size设置为0以表示没有大小限制;要禁用池化,请使用NullPool
最大溢出量 - 池的最大溢出量。当已检出的连接数达到pool_size设置的大小时,将返回额外的连接,直到达到此限制。当这些额外的连接被返回到池中时,它们将被断开并且丢弃。因此,池允许的同时连接总数是pool_size + max_overflow,而池允许的“睡眠”连接总数是pool_size。max_overflow可以设置为-1以表示没有溢出限制;不会对并发连接的总数施加任何限制。默认为10。
SQLALCHEMY_ENGINE_OPTIONS = {'pool_size': 40, 'max_overflow': 0}

考虑到上述情况,这个配置要求SQLAlchemy保持最多40个连接。
如果你不喜欢这样的设置,但是想要保留一些可用的连接,你可以尝试这样的配置:
SQLALCHEMY_ENGINE_OPTIONS = {'pool_size': 10, 'max_overflow': 30}

这将保持10个持久连接在池中,并且如果同时请求,将会爆发到40个连接。超过配置的池大小的任何连接在被检查回池时立即关闭。

你提到了“重新加入池中”,在ORM中如何实现呢?是调用session.close()吗? - Alex
没错。我想,如果会话用完了引用,连接也会自动释放回池中。 - SuperShoot

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接