将重试封装到“with”块中

39

我希望将数据库事务逻辑封装为一个with块:在事务中包装代码并处理各种异常(锁定问题)。这很简单,但我还想让块在特定异常后将代码块重试封装起来。我无法找到将其整齐地打包成上下文管理器的方法。

是否可以在with语句中重复代码?

我希望像这样简单地使用它,这真的很不错。

def do_work():
    ...
    # This is ideal!
    with transaction(retries=3):
        # Atomic DB statements
        ...
    ...

我目前使用装饰器来处理这个问题,但是我更愿意提供上下文管理器(实际上是两者都可以),这样我就可以选择将一些代码行放入 with 块中而不是使用装饰器包装内联函数,这是我目前所做的:

def do_work():
    ...
    # This is not ideal!
    @transaction(retries=3)
    def _perform_in_transaction():
        # Atomic DB statements
        ...
    _perform_in_transaction()
    ...

http://docs.python.org/release/2.5/whatsnew/pep-343.html 看起来有如何实现上下文管理器的示例。 - Vlad
5个回答

16
能否在with语句中重复使用代码? 不行。 正如邮件列表线程中早先指出的那样,您可以通过使装饰器调用传递的函数来减少一些重复。
def do_work():
    ...
    # This is not ideal!
    @transaction(retries=3)
    def _perform_in_transaction():
        # Atomic DB statements
        ...
    # called implicitly
    ...

啊,遗憾它不被支持。感谢提供线程链接。我喜欢将调用隐式化以使其更清晰的想法。如果我想在_perform_in_transaction中设置/修改变量,我猜我仍然需要手动调用它并返回我需要继续执行do_work函数的内容。 - Michael Waterfall

7
我想到的做法是实现一个标准的数据库事务上下文管理器,但允许在构造函数中传入retries参数。然后只需在您的方法实现中包装它。类似这样:
class transaction(object):
    def __init__(self, retries=0):
        self.retries = retries
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_val, traceback):
        pass

    # Implementation...
    def execute(self, query):
        err = None
        for _ in range(self.retries):
            try:
                return self._cursor.execute(query)
            except Exception as e:
                err = e # probably ought to save all errors, but hey
        raise err

with transaction(retries=3) as cursor:
    cursor.execute('BLAH')

1
你能详细说明 self._cursor 中的 _cursor 是从哪里来的吗? - Mike Müller
1
@MikeMüller 我试图在不陷入实现细节的情况下绘制一些常见的数据库API习惯用法。_cursor 应该是一个 Cursor 对象,适用于特定的数据库连接。完整的实现需要创建和包含某种类型的 Connection 对象,以便实际执行数据库事务。 - Henry Keiter
@HenryKeller 我会这样做 def __init__(self, cursor, retries=0): 并在 __init__ 中加入 self._cursor = cursor'。用法:with transaction(cursor, retries=3) as cursor:`。这样说通吗? - Mike Müller
1
@MikeMüller 当然。我在评论中提到“完整实现”时的意思就是:要完全实现这一点,最好是在构造函数中为ConnectionCursor留出空间,或者将构造函数设置为def __init__(self, dbhost, dbname, user, password):并从那里创建一个Connection对象。我没有在答案中包含这些内容,因为它们与OP的问题不相关,OP的问题是关于如何使用上下文管理器自动重复代码,而不是首先创建DB上下文管理器。 - Henry Keiter
1
@Oddthinking 你说得对,那是等价的。我认为将其作为事务参数的原因只是为了在事务中执行每个查询时都不必指定“重试次数”。 - Henry Keiter
显示剩余2条评论

4

由于装饰器本身就是函数,因此您可以执行以下操作:

with transaction(_perform_in_transaction, retries=3) as _perf:
    _perf()

要了解详细信息,您需要实现transaction()作为一个工厂方法,该方法返回一个对象,其中__callable__()设置为调用原始方法,并在失败时重复执行retries次; __enter__()__exit__()将像通常一样定义为数据库事务上下文管理器。
您也可以设置transaction(),使其本身执行传递的方法多达retries次,这可能需要与实现上下文管理器大致相同的工作量,但意味着实际使用仅需transaction(_perform_in_transaction, retries=3)(实际上与由Delnan提供的装饰器示例等效)。

3

虽然我同意无法使用上下文管理器完成...但可以使用两个上下文管理器来完成!

结果有点尴尬,我还不确定是否赞成自己的代码,但这就是客户端的样子:

with RetryManager(retries=3) as rm:
    while rm:
        with rm.protect:
            print("Attempt #%d of %d" % (rm.attempt_count, rm.max_retries))
             # Atomic DB statements

这里有一个明确的while循环,而且不止一个,而是两个with语句,这给我的喜好留下了太多错误的机会。

以下是代码:

class RetryManager(object):
    """ Context manager that counts attempts to run statements without
        exceptions being raised.
        - returns True when there should be more attempts
    """

    class _RetryProtector(object):
        """ Context manager that only raises exceptions if its parent
            RetryManager has given up."""
        def __init__(self, retry_manager):
            self._retry_manager = retry_manager

        def __enter__(self):
            self._retry_manager._note_try()
            return self

        def __exit__(self, exc_type, exc_val, traceback):
            if exc_type is None:
                self._retry_manager._note_success()
            else:
                # This would be a good place to implement sleep between
                # retries.
                pass

            # Suppress exception if the retry manager is still alive.
            return self._retry_manager.is_still_trying()

    def __init__(self, retries=1):

        self.max_retries = retries
        self.attempt_count = 0 # Note: 1-based.
        self._success = False

        self.protect = RetryManager._RetryProtector(self)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, traceback):
        pass

    def _note_try(self):
        self.attempt_count += 1

    def _note_success(self):
        self._success = True

    def is_still_trying(self):
        return not self._success and self.attempt_count < self.max_retries

    def __bool__(self):
        return self.is_still_trying()

奖励:我知道你不想把工作分成用装饰器包装的单独函数...但是如果你满意,Mozilla 的 redo 包提供了这样的装饰器,所以你不必自己动手。甚至有一个上下文管理器,它有效地充当您的函数的临时装饰器,但它仍然依赖于您可检索的代码被拆分为单个函数。


2
嗯... 我想我们可以稍微简化一下,只需使用 for retry in Retry(retries=3): with retry: 就可以了,这样只有两个外部块。 - user295691
@user295691:我找不出任何问题。看起来是一种改进。 - Oddthinking

-1

这个问题已经几年了,但是在阅读了答案后,我决定尝试一下。

这个解决方案需要使用一个“helper”类,但我认为它提供了一个通过上下文管理器配置重试的接口。

class Client:
    def _request(self):
        # do request stuff
        print("tried")
        raise Exception()

    def request(self):
        retry = getattr(self, "_retry", None)
        if not retry:
            return self._request()
        else:
            for n in range(retry.tries):
                try:
                    return self._request()
                except Exception:
                    retry.attempts += 1


class Retry:
    def __init__(self, client, tries=1):
        self.client = client
        self.tries = tries
        self.attempts = 0

    def __enter__(self):
        self.client._retry = self

    def __exit__(self, *exc):
        print(f"Tried {self.attempts} times")
        del self.client._retry



>>> client = Client()
>>> with Retry(client, tries=3):
    ... # will try 3 times
    ... response = client.request()

tried once
tried once
tried once
Tried 3 times

1
对我来说,这似乎是一种过度复杂化:将“tries”和“attempts”移动到“Client”类中会使“Retry”助手变得多余。此外,这将使整个实现更易读 ;) - TooroSan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接