重试失败的SQLAlchemy查询

14
每次我重启mysql服务时,我的应用程序都会在任何查询中收到以下错误:
result = self._query(query)
  File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 516, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 727, in _read_query_result
    result.read()
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 1066, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 656, in _read_packet
    packet_header = self._read_bytes(4)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 702, in _read_bytes
    CR.CR_SERVER_LOST, "Lost connection to MySQL server during query")
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query') [SQL: ...] [parameters: {...}] (Background on this error at: http://sqlalche.me/e/e3q8)

之后的任何查询都将像往常一样成功。

这只是一个常见的用例示例,在一般情况下,我可能希望根据错误重试任何查询。

是否有任何方法可以在sqlalchemy API的一些低级别处捕获并重试查询? 在我的代码中使用try-except或自定义query方法不合理,因为我会使用它太多次且不可维护。


1
使用自定义查询或其包装器有什么问题吗?在这种情况下,将其用作每个查询的“try...except...”块的基础似乎是最明智的解决方案。您是否有一个以[MCVE]格式演示问题的代码片段? - r.ook
4个回答

10
非常感谢你分享这段代码,我需要将其稍作修改以便能够直接与sqlalchemy.orm一起使用,如果对任何人有帮助的话..
from sqlalchemy.exc import OperationalError, StatementError
from sqlalchemy.orm.query import Query as _Query
from time import sleep

class RetryingQuery(_Query):
    __max_retry_count__ = 3

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "server closed the connection unexpectedly" not in str(ex):
                    raise
                if attempts <= self.__max_retry_count__:
                    sleep_for = 2 ** (attempts - 1)
                    logging.error(
                        "/!\ Database connection error: retrying Strategy => sleeping for {}s"
                    " and will retry (attempt #{} of {}) \n Detailed query impacted: {}".format(
                        sleep_for, attempts, self.__max_retry_count__, ex)
                )
                    sleep(sleep_for)
                    continue
                else:
                    raise
            except StatementError as ex:
                if "reconnect until invalid transaction is rolled back" not in str(ex):
                    raise
                self.session.rollback()

关于使用:将选项传递给sessionmaker:

sqlalchemy.orm.sessionmaker(bind=engine, query_cls=RetryingQuery)

8

编辑:SQLAlchemy的创建者评论说,这种方法不建议使用。

显然,sqlalchemy有一个很好的选项来自定义查询类,这正是我所需要的。

类实现:

import logging
from flask_sqlalchemy import BaseQuery
from sqlalchemy.exc import OperationalError
from time import sleep

class RetryingQuery(BaseQuery):

    __retry_count__ = 3
    __retry_sleep_interval_sec__ = 0.5

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "Lost connection to MySQL server during query" not in str(ex):
                    raise
                if attempts < self.__retry_count__:
                    logging.debug(
                        "MySQL connection lost - sleeping for %.2f sec and will retry (attempt #%d)",
                        self.__retry_sleep_interval_sec__, attempts
                    )
                    sleep(self.__retry_sleep_interval_sec__)
                    continue
                else:
                    raise

用法:

class BaseModel(Model):
    ...
    query_class = RetryingQuery
    ...

db = SQLAlchemy(model_class=BaseModel, query_class=RetryingQuery)

你能否解释一下它背后的逻辑? - Deva
通过使用这个错误拦截,你可以处理所有的查询,而不需要包装所有的 sqlalchemy api。 - Mugen
我在这方面有点新手,请问您能否解释一下如何通过覆盖__iter__方法来重试查询方法。 - Deva
1
@Deva 这需要在 sqlalchemy 代码中进行一些挖掘。大多数查询执行器最终都会调用 __iter__ 作为最低级别的公共点。我从他们的源代码和试错中发现了这一点。这绝对不是完全可靠的,可能无法涵盖每种情况。 - Mugen
27
你好 - 我是 SQLAlchemy 的作者。我刚刚被指向了这个文档。我强烈不建议使用像上面这样的模式。连接失败的“重试”只应该在事务的顶层进行,这就是池化的预检特性存在的原因。如果在事务进行过程中失去了连接,则需要重新运行整个操作。显式比隐式更好。 - zzzeek
2
@zzzeek感谢您指出这一点。确实,显式更好,我今天也会避免使用这种方法。尽管我曾经遇到过一些情况,显式方法根本不可行。 - Mugen

1
我不得不稍微调整它,以使其与Postgres配合使用,因为它有一个不同的错误消息。我知道这个问题被标记为mysql,但是通过搜索找到了这个问题(并且遇到了完全相同的问题),所以可能会对某些人有帮助。
我还必须捕获StatementError:(sqlalchemy.exc.InvalidRequestError) Can't reconnect until invalid transaction is rolled back,否则重试就无法进行,Flask也会崩溃。
最后,我将其做成了指数退避,因为为什么不呢?
import logging
from flask_sqlalchemy import BaseQuery
from sqlalchemy.exc import OperationalError, StatementError
from time import sleep

class RetryingQuery(BaseQuery):
    __retry_count__ = 3

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __iter__(self):
        attempts = 0
        while True:
            attempts += 1
            try:
                return super().__iter__()
            except OperationalError as ex:
                if "server closed the connection unexpectedly" not in str(ex):
                    raise
                if attempts < self.__retry_count__:
                    sleep_for = 2 ** (attempts - 1)
                    logging.error(
                        "Database connection error: {} - sleeping for {}s"
                        " and will retry (attempt #{} of {})".format(
                            ex, sleep_for, attempts, self.__retry_count__
                        )
                    )
                    sleep(sleep_for)
                    continue
                else:
                    raise
            except StatementError as ex:
                if "reconnect until invalid transaction is rolled back" not in str(ex):
                    raise
                self.session.rollback()

1

SQLAlchemy 还允许您监听 engine_connect 事件,在创建 connection 之前触发。这使得实现悲观断开连接处理的自定义逻辑成为可能。

下面的代码片段实现了指数退避重试。它来自 Apache Airflow 的 SQLAlchemy Utils: http://airflow.apache.org/docs/1.10.3/_modules/airflow/utils/sqlalchemy.html

@event.listens_for(engine, "engine_connect")
    def ping_connection(connection, branch):
        """
        Pessimistic SQLAlchemy disconnect handling. Ensures that each
        connection returned from the pool is properly connected to the database.

        http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic
        """
        if branch:
            # "branch" refers to a sub-connection of a connection,
            # we don't want to bother pinging on these.
            return

        start = time.time()
        backoff = initial_backoff_seconds

        # turn off "close with result".  This flag is only used with
        # "connectionless" execution, otherwise will be False in any case
        save_should_close_with_result = connection.should_close_with_result

        while True:
            connection.should_close_with_result = False

            try:
                connection.scalar(select([1]))
                # If we made it here then the connection appears to be healthy
                break
            except exc.DBAPIError as err:
                if time.time() - start >= reconnect_timeout_seconds:
                    log.error(
                        "Failed to re-establish DB connection within %s secs: %s",
                        reconnect_timeout_seconds,
                        err)
                    raise
                if err.connection_invalidated:
                    log.warning("DB connection invalidated. Reconnecting...")

                    # Use a truncated binary exponential backoff. Also includes
                    # a jitter to prevent the thundering herd problem of
                    # simultaneous client reconnects
                    backoff += backoff * random.random()
                    time.sleep(min(backoff, max_backoff_seconds))

                    # run the same SELECT again - the connection will re-validate
                    # itself and establish a new connection.  The disconnect detection
                    # here also causes the whole connection pool to be invalidated
                    # so that all stale connections are discarded.
                    continue
                else:
                    log.error(
                        "Unknown database connection error. Not retrying: %s",
                        err)
                    raise
            finally:
                # restore "close with result"
                connection.should_close_with_result = save_should_close_with_result


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接