如何延迟 Azure Service Bus 消息?

6

目前我正在使用Microsoft.Azure.ServiceBus.IQueueClientRegisterMessageHandler,然后我接收到的消息类型是Microsoft.Azure.ServiceBus.Message

根据文档

延迟消息API。.NET Framework客户端中的API是BrokeredMessage.Defer或BrokeredMessage.DeferAsync,.NET Standard客户端中的API是MessageReceiver.DeferAsync,并且Java客户端中为IMessageReceiver.defer或IMessageReceiver.deferAsync。

... 但是似乎没有库与我实际使用的类相关。如何延迟?我需要使用哪些类和其他内容才能延迟消息?上面的所有示例都没有足够的代码片段来解释它。

根据@Gaurav的要求更新

从您的答案中,我可以看到我的消息具有该属性:

message.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddHours(1);

但是queueClient也有这个方法:

queueClient.ScheduleMessageAsync(message, DateTime.UtcNow.AddHours(1));

因为我看不到如何在不调用 queueClient 的情况下表达我已经设置了ScheduledEnqueueTimeUtc,所以我将尝试使用 'scheduledMessageAsync'。

4个回答

7

Microsoft.Azure.ServiceBus.Message有一个属性叫做ScheduledEnqueueTimeUtc。只需将此属性的值设置为未来的日期/时间值,您想让消息在队列中出现的时间。消息将被隐藏,直到该时间,并且仅在那个日期/时间出现在队列中。

更新

所以我进行了测试,并确认了ScheduledEnqueueTimeUtcScheduleMessageAsync都可以使用。我使用了Microsoft.Azure.ServiceBusSDK的版本4.1.1

下面是我编写的代码:

    static void Main(string[] args)
    {
        var connectionString = "my-connection-string";
        var queueName = "test";
        QueueClient queueClient = new QueueClient(connectionString, queueName);
        Message msg1 = new Message()
        {
            Body = Encoding.UTF8.GetBytes("This message has ScheduledEnqueueTimeUtc property set. It will appear in queue after 2 minutes. Current date/time is: " + DateTime.Now),
            ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(2)
        };
        queueClient.SendAsync(msg1).GetAwaiter().GetResult();
        Message msg2 = new Message()
        {
            Body = Encoding.UTF8.GetBytes("This message is sent via ScheduleMessageAsync method. It will appear in queue after 2 minutes. Current date/time is: " + DateTime.Now)
        };
        queueClient.ScheduleMessageAsync(msg2, new DateTimeOffset(DateTime.UtcNow.AddMinutes(2))).GetAwaiter().GetResult();
        Console.ReadLine();
    }

当我以Peek-Lock模式获取消息时,我看到的是这个:

enter image description here


它的表现不如预期,我已经设置了那个值,但它仍然一遍又一遍地发送大量的消息。如何让Service Bus知道我已经设置了那个值? - Ninjanoel
我已经更新了我的问题,附上了我尝试过的代码以及即将尝试的内容。 - Ninjanoel
更新了我的回答。希望有所帮助。 - Gaurav Mantri
感谢@Gaurav的努力,但我收到消息而不是发送消息,“过一小时回来”显然行不通(老板说)。 我需要看到文档提示中提到的功能,即我可以使用defer回复消息,然后在我自己的决定下使用sequenceNumber进行关闭。 - Ninjanoel
我认为@Ninjanoel误解了这个解决方案的意图——它通过重新发布原始消息来绕过延迟消息的复杂性。 - cdonner

2
使用消息延迟 API,例如 BrokeredMessage.Defer 或 BrokeredMessage.DeferAsync,将会推迟该消息。
推迟消息将会改变消息的状态,从“Active”变为“Deferred”。可以根据序列号稍后检索该消息。
ScheduleMessageAsync() 用于安排消息的传递(在指定时间发送消息)。它不能在接收到消息后使用。

1

我已经编写了我正在寻找的解决方案,以下是基本概述:

在异步方法内部(运行自己的线程)

public async Task InitialiseAndRunMessageReceiver()

开始一个无限循环,读取消息

receiver = new MessageReceiver(serviceBusConnectionString, serviceBusQueueName, ReceiveMode.PeekLock); 
while (true) { var message = await receiver.ReceiveAsync(); ... more code... }

一旦您知道即将开始长时间任务,请推迟消息,但存储 message.SystemProperties.SequenceNumber。这会使其保留在队列中,但防止它被重新传递。

await receiver.DeferAsync(message.SystemProperties.LockToken);

当你最终完成后,使用message.SystemProperties.SequenceNumber再次请求消息,并将消息完成,就好像它没有被延迟一样。

var message = receiver.ReceiveDeferredMessageAsync(message.SystemProperties.SequenceNumber);
receiver.CompleteAsync(message.Result.SystemProperties.LockToken);

如果您的消息已从队列中删除,则会收到此提示。

我的许多困惑都是由于库的命名方式相似,生命周期重叠而引起的。

Microsoft.Azure.ServiceBus.Core.MessageReceiver 是上述消息接收器。


1

虽然这是一个老问题,但适合我情况的方法是删除消息并使用ScheduleMessageAsync(某处有复制方法)发布副本。然后消息将在所需时间回来。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接