MessageQueue.BeginReceive如何工作,如何正确使用它?

4

我目前有一个后台线程,在这个线程中有一个无限循环。

这个循环会不时地更新数据库中的一些值,并在MessageQueue上监听1秒钟(使用queue.Receive(TimeSpan.FromSeconds(1)) )。

只要没有消息进来,该调用就会内部抛出MessageQueueException(超时),然后被捕获并继续循环。如果有消息,该调用通常会返回并处理该消息,之后循环继续。

这导致了大量的First chance exceptions(每秒钟一次,除非有消息要处理),这会垃圾邮件调试输出,并且在我忘记排除MessageQueueExceptions时也会在调试器中断点。

因此,如何正确处理MessageQueue的异步处理,同时确保只要我的应用程序运行,队列就受到监视,并且数据库也会不时更新。当然,这里的线程不应占用100%的CPU。

我只需要大局观或一些正确完成的异步处理提示。


听起来,没有消息进来的情况并不是一个异常情况,因此抛出异常似乎是一种设计上的问题。 - Jamie Dixon
是的,如果没有要执行的操作,你不能只返回null或默认值吗?为了避免从方法返回而抛出异常有点糟糕。 - Ben Robinson
消息队列类本身在超时发生时抛出异常,而不是我。这就是我对这个解决方案的问题和困扰所在。 - Sebastian P.R. Gingter
超时异常是 MSMQ 的设计原理。你必须每秒轮询队列吗? - John Breakwell
3个回答

5
与其在线程中循环,我建议您注册一个委托以处理消息队列的ReceiveCompleted事件,如此处所述:

使用以下命名空间和代码示例:

using System; using System.Messaging;

namespace MyProject { /// /// 为示例提供容器类。 /// public class MyNewQueue {

    //**************************************************
    // Provides an entry point into the application.
    //       
    // This example performs asynchronous receive operation
    // processing.
    //**************************************************

    public static void Main()
    {
        // Create an instance of MessageQueue. Set its formatter.
        MessageQueue myQueue = new MessageQueue(".\\myQueue");
        myQueue.Formatter = new XmlMessageFormatter(new Type[]
            {typeof(String)});

        // Add an event handler for the ReceiveCompleted event.
        myQueue.ReceiveCompleted += new 
            ReceiveCompletedEventHandler(MyReceiveCompleted);

        // Begin the asynchronous receive operation.
        myQueue.BeginReceive();

        // Do other work on the current thread.

        return;
    }


    //**************************************************
    // Provides an event handler for the ReceiveCompleted
    // event.
    //**************************************************

    private static void MyReceiveCompleted(Object source, 
        ReceiveCompletedEventArgs asyncResult)
    {
        // Connect to the queue.
        MessageQueue mq = (MessageQueue)source;

        // End the asynchronous Receive operation.
        Message m = mq.EndReceive(asyncResult.AsyncResult);

        // Display message information on the screen.
        Console.WriteLine("Message: " + (string)m.Body);

        // Restart the asynchronous Receive operation.
        mq.BeginReceive();

        return; 
    }
}

}

来源:https://learn.microsoft.com/zh-cn/dotnet/api/system.messaging.messagequeue.receivecompleted?view=netframework-4.7.2


很好的例子,感谢@DaveRead。 - Frank Myat Thu
链接已经失效了..是的..人们仍在使用该死的MSMQ :( 如果您考虑处理/限制消息数量,这将变得非常难以处理。 - Piotr Kula
已更新答案,附上代码片段和源链接。 - DaveRead

3
与Jamie Dixon的评论相反,情况确实是异常的。请注意方法及其参数的命名:BeginReceive(TimeSpan timeout)。
如果该方法被命名为BeginTryReceive,则如果没有收到任何消息,则完全正常。将其命名为BeginReceive(或同步版本的Receive)意味着期望有一条消息进入队列。Timeout参数被命名为timeout也很重要,因为超时是异常情况。超时意味着期望收到响应,但未给出任何响应,并且调用者选择停止等待并假定发生了错误。当您使用1秒超时调用BeginReceive / Receive时,您正在声明如果在那时没有消息进入队列,则必须出现问题,我们需要处理它。
如果我正确理解您想要做什么的方式,则我将实现以下内容:
调用BeginReceive,超时时间非常长,甚至没有超时,如果我不将空队列视为错误。
将事件处理程序附加到ReceiveCompleted事件,该事件处理程序1)处理消息,并2)再次调用BeginReceive。
我不会使用无限循环。这既是不良实践,也是使用类似于BeginReceive的异步方法时完全多余的。
编辑:为了放弃没有被任何客户端读取的队列,请让队列编写器查看队列以确定是否“死亡”。
编辑:我有另一个建议。由于我不知道您的应用程序的详细信息,因此不知道它是否可行或适当。在我看来,您基本上正在建立客户端和服务器之间的连接,并将消息队列作为通信渠道。为什么这是“连接”?因为如果没有人收听,队列就不会被写入。我认为这就是连接的基本含义。使用套接字或命名管道传输消息是否更合适?这样,客户端只需在完成阅读时关闭Stream对象,服务器即会立即收到通知。正如我所说,我不知道它是否适用于您正在进行的操作,但它感觉像是一种更合适的通信渠道。

如我之前在问题中所述,定期更新数据库中的表格以告知其他应用程序监听器仍然存活是很重要的。因此,在无限期等待消息时,表格将不会被更新,客户端被视为已死亡,并且不再成为消息的目标。但是,客户端接收发送的消息也很重要。因此,在一个逻辑关注点中同时具有消息监听和表格的持续更新是非常重要的。 - Sebastian P.R. Gingter
正如我所说:表格中列出了所有活动目标队列。当队列在一定时间内没有更新时,它被视为死亡并将从表格中删除。当发送消息时,它将发送到所有活动队列。 - Sebastian P.R. Gingter
好的。我找不到你写这个的地方。无论如何,另一种更好的方法(如果你问我的话)是让队列的编写者通过查看队列是否被客户端删除消息来确定队列是否死亡。 - Christian Palmstierna
不,它不是一种“连接”。它只是一个通知。发送者(一个客户端,可能是在另一台机器上运行的相同代码,但也可能是在其他地方运行的服务)需要通知所有客户端(包括他自己)某些事件。在这种情况下,它查找数据库中的所有活动客户端,并将消息发送到它们的队列中。然后客户端可以(但并不需要)处理该消息。套接字将需要大量连接来通知所有其他客户端,并消耗更多的资源。 - Sebastian P.R. Gingter

3
你是否考虑过使用MessageQueue.GetMessageEnumerator2返回的MessageEnumerator?通过迭代,你可以获得队列的动态内容,并从中检查和删除消息。
  • 如果队列没有消息,则MoveNext()会返回false,你不需要捕获一级异常
  • 如果在迭代开始后有新的消息,则它们将被迭代(如果它们放置在光标之后)。
  • 如果在光标之前有新的消息,则可以重置迭代器或继续进行(如果此时不需要较低优先级的消息)。

是的,谢谢。这很好。所以我可以等待1秒钟,如果没有消息,它就会返回false,我可以保留我的逻辑不变。不再有异常 :) - Sebastian P.R. Gingter

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接