Azure Service Bus:实现处理失败消息的指数重试策略的最佳方法

14

我正在不断地以窥视模式接收消息,如果处理失败(而不是交付),则放弃它们。但是,该消息立即再次可用并被接收以进行处理。它再次快速失败,并在最大交付次数之后被发送到死信队列。

是否有一种方法可以配置主题/订阅,在消息被放弃后等待一段时间才释放它?最好呈指数方式增加等待时间。

当然,我也可以通过代码进行建议。

5个回答

11

Service Bus配置中没有设置指数退避的方法。我曾经遇到过相同的问题,采取了以下措施:

  1. 取消消息排队并将其标记为已接收。
  2. 在try/catch块内执行处理。如果出现异常,则在将来某个时间点上使用计划的交付重新排队。

我们将Service Bus消息队列有效载荷封装在一个指定了传送尝试次数的类中。我们将传送尝试次数乘以一个常数,然后将该数字加到当前时间日期以便在未来进行计划交付。超过我们想要尝试的传送尝试次数后,我们明确地将消息存入死信。

编辑于7-17-2020 考虑使用Azure可靠任务框架,该框架具有可自定义的重试策略。


如果应用程序在步骤1和2之间崩溃,您将丢失消息-考虑使用peek lock功能。 - alastairtree
如果你所说的“崩溃”是指异常,那么这个方法可以正常工作。但如果你所说的“崩溃”是指数据中心消失在蘑菇云中,那么消息确实会丢失。但使用peeklock将使整个算法无效。 - Rob Reagan
4
任何在不幸的时刻停止代码执行的失败都可能导致数据丢失,因为消息已经完成。在某些事务性系统中,这可能是不可接受的,在其他系统中,偶尔的数据丢失问题较小。如果数据丢失是一个问题,请不要将第一条消息标记为已完成,直到第一条消息成功处理或替换消息已成功排队。然后最坏的情况是,在替换消息排队之后并在完成第一条消息之前崩溃 - 在重新启动时,您将获得两条消息但零数据丢失。 - alastairtree
为了完全避免消息丢失,请使用PeekLock并在事务中完成消息,同时安排新消息。更多信息请参见https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-transactions。 - Adam Smejkal

0

你可以使用定时消息来实现这个功能。

基本上,当你需要重试一条消息时,只需将其安排在未来的某个时间,Service Bus 就会在足够的时间过去后再次将其添加到队列中:

        ServiceBusSender queue = ...
        int secs = // calculate the delay

        var msg = new ServiceBusMessage("My scheduled message body")
        {
            ApplicationProperties =
            {
                ["RetryCount"] = retryCount,
            },
        };

        logger.LogInformation("Scheduling for " + secs + " secs");
        await queue.ScheduleMessageAsync(msg, DateTimeOffset.Now.AddSeconds(secs));

您必须在标题或正文中添加有关重试计数的信息,否则您将不知道尝试了多少次,并且无法真正计算未来的日期。


0

另一个选择是使用MassTransit,它支持Azure Service Bus。

看一下它广泛的重试配置

请注意,MassTransit在接收到消息后实际上是在内存中执行重试,因此您需要适当调整主题订阅的MaxDeliveryCountMaxAutoRenewDuration设置。

您的配置可能如下所示:

var busControl = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
    var host = cfg.Host(serviceBusConnectionString, hst => { });

    cfg.UseMessageRetry(retryConfigurator =>
        RetryConfigurationExtensions.Exponential(retryConfigurator, ...);

    cfg.SubscriptionEndpoint(
        "subscriptionName",
        "topicPath",
        e =>
        {
            e.Consumer<SomeConsumer>();
            // Let MassTransit do the retries
            e.MaxDeliveryCount = 1;
            e.MaxAutoRenewDuration = TimeSpan.FromMinutes(10);
        });
});

0
消息在消息锁定期间应保持不可用/隐藏。如果您明确地放弃消息(使用AbandonAsync方法),则锁定将被取消,使消息可供消费;如果您没有明确放弃消息而是完成了进程,则Azure将等待锁定持续时间结束后再次使消息可用,这可能会导致稍微延迟一些。您可以调整此设置以实现重试操作;您还可以根据此处的说明实施更完整的方法(如指数退避)。

-5

我也在研究这个话题,我发现 Microsoft 公司的 RetryExponential 类。

RetryExponential 类

命名空间:Microsoft.ServiceBus
程序集:Microsoft.ServiceBus.dll

表示重试策略的实现。每次必须重新尝试消息操作时,重试间隔以交错指数方式增长。

public sealed class RetryExponential : Microsoft.ServiceBus.RetryPolicy


Bas,如果你能解释一下这个类是如何解决问题的,那就更好了。我从微软网站上摘取了摘要,所以你的回答不会只是一个“链接”,因为这些通常被视为低质量而被删除。 - brasofilo
3
RetryExponential类对于这种情况没有帮助。该类用于控制通过Service Bus API向Service Bus本身发出的请求的重试,而不是队列中消息的重试。例如,RetryExponential可以用于自动重试ReceiveMessage调用、SendMessage调用等。但它对于何时重试失败的消息或何时将其发送到死信队列没有影响。 - Boschy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接