SQLAlchemy和多进程的连接问题

27
我正在一个项目中使用PostgreSQL和SQLAlchemy,该项目由一个启动子进程的主进程组成。所有这些进程都通过SQLAlchemy访问数据库。
我遇到了可重复的连接故障:前几个子进程正常工作,但过一段时间后就会引发连接错误。以下是MWCE:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

engine = None
Session = None
session = None

def init():
    global engine, Session, session
    engine = create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()

def cleanup():
    session.close()
    engine.dispose()

def target(id):
    init()
    try:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()
    finally:
        cleanup()

def main():
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2
    finally:
        cleanup()

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

在我的系统上(PostgreSQL 9.6,SQLAlchemy 1.1.4,psycopg2 2.6.2,Python 2.7,Ubuntu 14.04),这将产生以下结果

1
2
3
4
5
6
7
8
9
10
11
Traceback (most recent call last):
  File "./fork_test.py", line 64, in <module>
    main()
  File "./fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 11074}]

这是一个可重复的问题,总是在相同的迭代中崩溃。
我正在创建一个新的引擎和会话,遵循SQLAlchemy文档其他地方的建议,在fork之后。有趣的是,以下略微不同的方法不会崩溃:
import contextlib
import multiprocessing

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

@contextlib.contextmanager
def get_session():
    engine = sqlalchemy.create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        yield session
    finally:
        session.close()
        engine.dispose()

def target(id):
    with get_session() as session:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()

def main():
    with get_session() as session:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

由于原始代码更加复杂,不能简单地转换到后一个版本,因此我想了解为什么其中一个起作用,而另一个不起作用。

唯一明显的区别是崩溃代码使用全局变量来存储引擎和会话 - 这些通过写时复制与子进程共享。然而,由于我在fork之后直接重置它们,我不明白这可能是个问题。

更新

我使用最新的SQLAlchemy(1.1.5)在Python 2.7和Python 3.4上重新运行了这两个代码片段。结果基本上与上述描述相同。但是,在Python 2.7上,第一个代码片段的崩溃现在发生在第13次迭代中(可重复),而在3.4上,它已经在第三次迭代中发生(也可重复)。第二个代码片段在两个版本上都可以正常运行。这是3.4的回溯:

1
2
3
Traceback (most recent call last):
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "fork_test.py", line 64, in <module>
    main()
  File "fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 3397}]

以下是PostgreSQL日志(2.7和3.4版本相同):
2017-01-18 10:59:36 UTC [22429-1] LOG:  database system was shut down at 2017-01-18 10:59:35 UTC
2017-01-18 10:59:36 UTC [22429-2] LOG:  MultiXact member wraparound protections are now enabled
2017-01-18 10:59:36 UTC [22428-1] LOG:  database system is ready to accept connections
2017-01-18 10:59:36 UTC [22433-1] LOG:  autovacuum launcher started
2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG:  incomplete startup packet
2017-01-18 11:00:10 UTC [22466-1] user@db LOG:  SSL error: decryption failed or bad record mac
2017-01-18 11:00:10 UTC [22466-2] user@db LOG:  could not receive data from client: Connection reset by peer

(请注意,关于不完整的启动数据包的消息是无害的

你正在使用哪个版本的Python? - Thomas Moreau
@ThomasMoreau:“PostgreSQL 9.6,SQLAlchemy 1.1.4,psycopg2 2.6.2,Python 2.7,Ubuntu 14.04” ;) - Florian Brucker
1
它在我的系统上运行得很好... 你有来自你的postgresql服务器的任何日志吗?此外,它是否在python3中崩溃? - Thomas Moreau
已使用Postgresql 9.5、SQLA 1.1.3和1.1.5,Python 2.7和3.5进行测试,但未能重现-这对您没有帮助,仅供记录。 - Ilja Everilä
1
错误:sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL 错误:解密失败或坏的记录 mac - Ilja Everilä
显示剩余7条评论
1个回答

46

引用 "如何在Python多进程或os.fork()中使用引擎/连接/会话?",并强调:

SQLAlchemy引擎对象指的是现有数据库连接的连接池。 因此,当此对象复制到子进程时,目标是确保不传递任何数据库连接

但是,对于共享事务活动的会话或连接的情况,没有自动修复的方法;应用程序需要确保新的子进程仅启动新的Connection对象和事务,以及ORM Session对象。

问题源于派生的子进程继承了全局的实时session,该会话持有一个Connection。当target调用init时,它覆盖了对enginesession的全局引用,从而将它们在子进程中的引用计数减少到0,并强制执行它们的finalization。例如,如果以某种方式在子进程中创建了对继承的会话的另一个引用,则会防止其被清理,但不要这样做。在main加入并恢复正常操作后,它尝试使用现在可能已经完成,或者其他方式失步的连接。至于为什么只有经过某些迭代之后才会导致错误,我不确定。

使用全局变量处理这种情况的唯一方法是:

  1. 关闭所有会话
  2. 调用 engine.dispose()

然后再派生子进程。这将防止连接泄漏到子进程中。例如:

def main():
    global session
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        dummy_id = dummy.id
        # Return the Connection to the pool
        session.close()
        # Dispose of it!
        engine.dispose()
        # ...or call your cleanup() function, which does the same
        p = multiprocessing.Process(target=target, args=(dummy_id,))
        p.start()
        p.join()
        # Start a new session
        session = Session()
        dummy = session.query(Dummy).get(dummy_id)
        assert dummy.value == 2
    finally:
        cleanup()

您的第二个例子并没有在子进程中触发终止,因此它似乎只是起作用,但它可能像第一个一样存在问题,因为它仍然继承了在main中本地定义的会话及其连接的副本。


scoped_session应该能在这里帮助吧?https://docs.sqlalchemy.org/en/latest/orm/contextual.html - Lucas03
@lucas03 不是,作用域会话是一个线程本地的会话注册表。 - Ilja Everilä
没错,实际上我们在gunicorn的post_fork和celery的on_worker_process_init中调用了engine.dispose - Lucas03
2
在某些情况下,我发现需要在dispose()之后立即调用gc.collect();否则,分叉的进程可能会尝试收集死亡的与引擎相关的垃圾,导致上述相同的OperationalError报告。 - Luke
@Lucas03 @Ilja Everilä,我们能否像这样使用scoped_sessionscopefuncstr(os.getpid()) + str(threading.local().__hash__())?https://docs.sqlalchemy.org/en/13/orm/contextual.html#using-custom-created-scopes是否存在单进程多线程应用程序会出现问题的情况? - ludaavics
在单进程、多线程应用程序中,默认行为已足够,只要您的会话生命周期与线程相关联。在多进程应用程序中,如果在分叉之前填充,则作用域会话仍会泄漏连接,因此应该被创建并视为属于单个进程。它可以处理进程内的范围,而不能处理进程间的范围。 - Ilja Everilä

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接