Postgres,SQLAlchemy和多进程

4
我对Python的多进程编程还比较新,遇到了很多相关问题。我最新的问题是在使用多进程、SQLAlchemy和PostgreSQL的组合时出现了一些问题。有时会出现一个错误。
 sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac

经过研究,我在文档中找到了这个提示:

https://docs.sqlalchemy.org/en/13/core/pooling.html“使用连接池时(通过create_engine()创建的Engine也是如此),关键是不要共享池化的连接到派生进程。TCP连接被表示为文件描述符,通常跨越进程边界工作,这意味着这将导致两个或多个完全独立的Python解释器状态并发访问文件描述符。

有两种处理方法。

第一种方法是,在子进程内创建一个新的Engine,或者在现有的Engine上,在子进程使用任何连接之前调用Engine.dispose()。 这将从池中删除所有现有的连接,以便它创建所有新的连接。"

还有这个:

这段话是关于在使用uWSGI、Flask、SQLAlchemy和PostgreSQL时遇到的SSL错误问题。问题最终原因是uwsgi的forking机制。当使用主进程和多个工作进程时,uwsgi会在主进程中初始化应用程序,然后将应用程序复制到每个工作进程中。问题在于,如果在初始化应用程序时打开数据库连接,则会导致多个进程共享同一个连接,从而引发上述错误。
您的理解是,在使用多进程时,必须确保每个进程都使用新的Engine。在您的子进程中只有一个类读写PostgreSQL数据库,因此您决定在该类内定义一个SQLAlchemy Engine。
class WS_DB_Booker():
    def __init__(self):

        engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
        Base_inside_class = declarative_base()
        Base_inside_class.metadata.create_all(engine_inside_class)
        session_factory_inside_class = sessionmaker(bind=engine_inside_class)
        self.DBSession_inside_class = scoped_session(session_factory_inside_class)

    def example_method_to_read_from_db(self):
        try:
            sql_alc_session = self.DBSession_inside_class()
            sql_alc_session.query(and_so_on....

这在最初的试验中运行良好,没有任何问题。但我不确定这是否是在类内定义引擎的适当方式,或者是否会导致任何问题?
1个回答

3

你如何分叉进程以及哪个组件进行了分叉并不是很清楚。

你需要确保的是,在分叉之后,实例化WS_DB_Broker

如果你用错误的方式(在分叉之前进行实例化),那么Engine可能已经引用了一些Pool中的dbapi连接。有关此内容,请参见SQLAlchemy与引擎一起使用的文档

为了使你的错误更加明显,你可以做如下操作:

import os

class WS_DB_Booker():
    def __init__(self):
        # Remember the process id from the time of instantiation. If the
        # interpreter is forked then the output of `os.getpid()` will change.
        self._pid = os.getpid()

        engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
        Base_inside_class = declarative_base()
        Base_inside_class.metadata.create_all(engine_inside_class)
        session_factory_inside_class = sessionmaker(bind=engine_inside_class)
        self._session = scoped_session(session_factory_inside_class)

    def get_session():
        if self._pid != os.getpid():
             raise RuntimeError("Forked after instantiating! Please fix!")
        return self._session()

    def example_method_to_read_from_db(self):
        try:
            sql_alc_session = self.get_session()  
            #                      ^^^^^^^^^^^^^
            # this may throw RuntimeError when used incorrectly, thus saving you
            # from your own mistake.
            sql_alc_session.query(and_so_on....

我在分叉后实例化该类。我的问题更关注于在类内定义引擎。 - Egirus Ornila
如果您能确保在程序的整个生命周期中只实例化该类一次,那么就可以了。否则,您必须想办法确保这样做。 - pi.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接