在异常情况下重新排队消息

7
我正在寻找一种可靠的方式重新排队未能正确处理的消息-此时。
我一直在研究http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/,看起来在RabbitMQ API中支持重新排队消息。
else //reject the message but push back to queue for later re-try
{
    Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
    model.BasicReject(deliveryArguments.DeliveryTag, true);
}

然而我正在使用EasyNetQ。因此想知道如何在这里实现类似的功能。
bus.Subscribe<MyMessage>("my_subscription_id", msg => {
    try
    {
        // do work... could be long running
    }
    catch ()
    {
        // something went wrong - requeue message
    }
});

这是否是一个好的方法?如果RabbitMQ服务器的ACK等待超时,不ACK消息可能会引起问题。do work的执行时间超过了ACK等待时间。
2个回答

10

所以我想出了这个解决方案。它通过EasyNetQ替换了默认的错误策略。

public class DeadLetterStrategy : DefaultConsumerErrorStrategy
{
    public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
    : base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
    {
    }

    public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
    {
        object deathHeaderObject;
        if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
            return AckStrategies.NackWithoutRequeue;

        var deathHeaders = deathHeaderObject as IList;

        if (deathHeaders == null)
            return AckStrategies.NackWithoutRequeue;

        var retries = 0;
        foreach (IDictionary header in deathHeaders)
        {
            var count = int.Parse(header["count"].ToString());
            retries += count;
        }

        if (retries < 3)
            return AckStrategies.NackWithoutRequeue;
        return base.HandleConsumerError(context, exception);
    }
}
您可以像这样进行替换:

RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())

你必须使用 AdvancedBus,因此你需要手动设置一切。

using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
{
    var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
    var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
    var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
    bus.Advanced.Bind(deadExchange, queue, "");
    bus.Advanced.Bind(textExchange, queue, "");

    bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
}

这将使未能成功传递的消息死信三次。此后,它将进入由EasyNetQ提供的默认错误队列进行错误处理。您可以订阅该队列。

当异常从您的消费者方法传播出来时,消息将被标记为死信。因此,这将触发死信事件。

static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
{
    throw new Exception("This is a test!");
}

2
在查看您的DeadLetterStrategy实现时,重新提交消息到队列之间没有延迟。我希望在下一次重新提交之间有一些运行延迟,例如2*t秒的延迟(常见策略是通过乘以某个因子来延迟时间)。这是否可以在不使用Thread.Sleep的情况下完成? - Macko

4
据我所知,使用EasyNetQ无法手动acknackreject消息。
我看到您已经向EasyNetQ团队开了一个问题单, 但是还没有答案。
顺便说一句,在我的所有使用的库中(在NodeJS中),这是一个非常适当的功能集,并且很常见。我很惊讶EasyNetQ不支持这个功能。

如果我想要这个功能,似乎我必须退而使用RabbitMQ.Client。 - Snæbjørn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接