使用指数退避重新尝试Celery任务

88

对于这样的任务:

from celery.decorators import task

@task()
def add(x, y):
    if not x or not y:
        raise Exception("test error")
    return self.wait_until_server_responds(

如果它抛出一个异常,我想从守护程序的角度重试它,如何应用指数退避算法,即在2^2,2^3,2^4等秒钟后重试?

另外,重试是否由服务器端维护,以便如果工作进程被杀死,下一个生成的工作进程将接管重试任务?

3个回答

160

task.request.retries 属性包含当前任务的尝试次数,因此您可以使用它来实现指数退避:

from celery.task import task

@task(bind=True, max_retries=3)
def update_status(self, auth, status):
    try:
        Twitter(auth).update_status(status)
    except Twitter.WhaleFail as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

为了避免雷击群问题,您可以考虑在指数退避中添加随机的抖动:
import random
self.retry(exc=exc, countdown=int(random.uniform(2, 4) ** self.request.retries))

2
据我所知,倒计时属性设置了MQ后端(例如RabbitMQ)任务的预计时间。因此,它不是在客户端设置的。 - idanzalz
10
对于 celery 3.1 版本,你应该使用 @task(bind=True),celery 将会把 self 作为函数的第一个参数传递进去,所以你需要改变参数为 def update_status(self, auth, status):,这样你就能访问到 self.retries - robbyt
3
谢谢 @robbyt!只需要做一个小更正 - retriesrequest 的属性,所以正确的调用方式是 self.request.retries - tutuDajuju
你能否添加一个使用@task(bind=True)调用任务的例子?通常我只会这样做:from tasks.py import update_status; update_status(auth, status),但是我应该传递什么给self - Matt
请查看此问题的其他答案,以获取内置答案:https://dev59.com/PGkw5IYBdhLWcg3wqMY1#46467851 - jorf.brunning
显示剩余2条评论

56

3
不错的发现,在4.1.0中让我感到困惑,为什么我的“retry_backoff”参数没有被尊重。 - kororo
2
@kororo,似乎self.retry无法正常工作,只能处理其他异常类型。 - rdrey
采用这种方法,您还可以受益于内置的retry_jitter(默认为True),它避免了asksol答案中提到的Thundering Herd Problem。 - qwertysmack
这是正确的答案,因为它是内置的,不需要手动处理倒计时。 - jorf.brunning
当调用retry()时,这个方法是否也适用?至少在Celery 4.2.2上,对于非自动重试似乎不起作用。有人有什么想法吗? - Sarang

3
FYI,celery有一个实用程序函数来计算带抖动的指数退避时间,在这里,所以您不需要编写自己的函数。
def get_exponential_backoff_interval(
    factor,
    retries,
    maximum,
    full_jitter=False
):
    """Calculate the exponential backoff wait time."""
    # Will be zero if factor equals 0
    countdown = min(maximum, factor * (2 ** retries))
    # Full jitter according to
    # https://www.awsarchitectureblog.com/2015/03/backoff.html
    if full_jitter:
        countdown = random.randrange(countdown + 1)
    # Adjust according to maximum wait time and account for negative values.
    return max(0, countdown)

2
将来请避免仅提供链接作为答案,因为链接随着时间的推移往往会失效。最好在您的答案中包含代码片段和解释,以获得最大的赞数和增值。编辑:正如这个答案的链接已经失效一样。https://dev59.com/PGkw5IYBdhLWcg3wqMY1#46467851 - dKen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接