我创建了一个示例应用程序,该应用程序将向队列发送消息并读取消息。在此应用程序中,我使用了“ReceiveAndDelete”,以下是示例代码:
创建消息
private static async void CreateMessage(string queueName, string textMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage(textMessage);
// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}
接收消息
// handle received messages
static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
// complete the message. messages is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
static async Task ReceiveMessagesAsync()
{
var processorOptions = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
PrefetchCount = 1
};
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
主方法
static string connectionString = "***";
static string queueName = "firstqueue";
static async Task Main(string[] args)
{
try
{
await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
await ReceiveMessagesAsync();
}
catch (Exception ex)
{
throw;
}
Console.ReadKey();
}
一切都很好,但是一旦应用程序调用“await processor.StartProcessingAsync();”,即使所有消息尚未处理完毕,所有消息都会从队列中读取。在我的示例中,队列中有两个消息,但是当调用“await processor.StartProcessingAsync();”时,消息计数变为零(基本上是消息出队),并且它开始逐个处理消息。据我理解,如果消息尚未开始处理,则应该在队列中。通过这个示例,只有一个消息应该从队列中删除,第二个消息应该在队列中可见。
这是预期的行为还是我漏掉了什么?
Main
方法中没有等待任何基于任务的方法,因此你将无法获得可预测的输出。 - Peter Bons