Azure服务总线ReceiveAndDelete

3

我创建了一个示例应用程序,该应用程序将向队列发送消息并读取消息。在此应用程序中,我使用了“ReceiveAndDelete”,以下是示例代码:

创建消息

private static async void CreateMessage(string queueName, string textMessage)
{
    // create a Service Bus client 
    await using (ServiceBusClient client = new ServiceBusClient(connectionString))
    {
        // create a sender for the queue 
        ServiceBusSender sender = client.CreateSender(queueName);

        // create a message that we can send
        ServiceBusMessage message = new ServiceBusMessage(textMessage);

        // send the message
        await sender.SendMessageAsync(message);
        Console.WriteLine($"Sent a single message to the queue: {queueName}");
    }
}

接收消息

// handle received messages
static async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString();
    Console.WriteLine($"Received: {body}");

    // complete the message. messages is deleted from the queue. 
    await args.CompleteMessageAsync(args.Message);
}

// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
    Console.WriteLine(args.Exception.ToString());
    return Task.CompletedTask;
}

static async Task ReceiveMessagesAsync()
{
    var processorOptions = new ServiceBusProcessorOptions
    {
        AutoCompleteMessages = false,
        MaxConcurrentCalls = 1,
        MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
        ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
        PrefetchCount = 1
    };


    await using (ServiceBusClient client = new ServiceBusClient(connectionString))
    {

        // create a processor that we can use to process the messages
        ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);

        // add handler to process messages
        processor.ProcessMessageAsync += MessageHandler;

        // add handler to process any errors
        processor.ProcessErrorAsync += ErrorHandler;
        
        // start processing 
        await processor.StartProcessingAsync();

        Console.WriteLine("Wait for a minute and then press any key to end the processing");
        Console.ReadKey();

        // stop processing 
        Console.WriteLine("\nStopping the receiver...");
        await processor.StopProcessingAsync();
        Console.WriteLine("Stopped receiving messages");
    }
}

主方法

static string connectionString = "***";
static string queueName = "firstqueue";
static async Task Main(string[] args)
{
    try
    {
        await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
        await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");

        await ReceiveMessagesAsync();
    }
    catch (Exception ex)
    {

        throw;
    }
    Console.ReadKey();
}

一切都很好,但是一旦应用程序调用“await processor.StartProcessingAsync();”,即使所有消息尚未处理完毕,所有消息都会从队列中读取。在我的示例中,队列中有两个消息,但是当调用“await processor.StartProcessingAsync();”时,消息计数变为零(基本上是消息出队),并且它开始逐个处理消息。据我理解,如果消息尚未开始处理,则应该在队列中。通过这个示例,只有一个消息应该从队列中删除,第二个消息应该在队列中可见。

这是预期的行为还是我漏掉了什么?


你在Main方法中没有等待任何基于任务的方法,因此你将无法获得可预测的输出。 - Peter Bons
1个回答

2
这是“接收并删除”模式的预期行为,无论客户端是否能够处理消息,Service Bus 都会在将消息发送到客户端后立即删除该消息。从这个 链接 中可以看出:该操作从队列或订阅中接收消息,并以一个原子操作从该队列或订阅中删除该消息。如果您想控制此行为,可以使用PeekLock模式获取消息,处理消息,然后在处理成功后调用消息上的Complete方法来删除该消息。 更新 我尝试了您的代码,下面是我的观察结果:
  1. PrefetchCount = 1时,首次从队列中获取并删除2条消息。之后,会获取并删除单个消息。可能的解释是预取了1条消息,并在请求时获取了1条消息。

  2. PrefetchCount = 0(或从`processorOptions中省略)时,将获取并删除单个消息。

请尝试以下代码:

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

namespace SO67076189
{
    class Program
    {
        static string connectionString = "connection-string";
        static string queueName = "queue-name";
        static async Task Main(string[] args)
        {
            try
            {
                await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 3 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 4 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 5 to test 'ReceiveAndDelete'");

                await ReceiveMessagesAsync();
            }
            catch (Exception ex)
            {

                throw;
            }
            Console.ReadKey();
        }

        private static async Task CreateMessage(string queueName, string textMessage)
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the queue 
                ServiceBusSender sender = client.CreateSender(queueName);

                // create a message that we can send
                ServiceBusMessage message = new ServiceBusMessage(textMessage);

                // send the message
                await sender.SendMessageAsync(message);
                Console.WriteLine($"Sent a single message to the queue: {queueName}");
            }
        }

        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string body = args.Message.Body.ToString();
            Console.WriteLine($"Received: {body}");

            // complete the message. messages is deleted from the queue. 
            //await args.CompleteMessageAsync(args.Message);
        }

        // handle any errors when receiving messages
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }

        static async Task ReceiveMessagesAsync()
        {
            var processorOptions = new ServiceBusProcessorOptions
            {
                //AutoCompleteMessages = false,
                //MaxConcurrentCalls = 1,
                //MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
                ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
                //PrefetchCount = 1
            };


            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {

                // create a processor that we can use to process the messages
                ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);

                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;

                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;

                // start processing 
                await processor.StartProcessingAsync();

                Console.WriteLine("Wait for a minute and then press any key to end the processing");
                Console.ReadKey();

                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
        }
    }
}

感谢@Gaurav,我确实理解它会读取消息并将其删除。但是如果有多条消息,为什么它会读取所有消息并删除所有消息呢?链接没有说明它会从队列中读取和删除所有消息。 - Ravi Khambhati
可能是因为 PrefetchCount = 10 导致的吗?你可以尝试将其更改为1或完全删除它吗?有关 PrefetchCount 的更多信息,请参见此处:https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusprocessoroptions.prefetchcount?view=azure-dotnet。 - Gaurav Mantri
@RaviKhambhati 在调查问题之前,请先尝试等待您的方法,这样可能会避免出现问题。 - Peter Bons
@GauravMantri 测试了PrefetchCount = 1,得到了相同的结果。也更新了代码。 - Ravi Khambhati
@PeterBons 根据建议更新了代码。也进行了测试,结果相同。 - Ravi Khambhati
@RaviKhambhati 我更新了我的回答。希望对你有所帮助。 - Gaurav Mantri

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接