用任务替换无限线程循环(消息泵)

3
在我的应用程序中,我需要监听多个不同的队列并反序列化/分派接收到的队列消息。实际上,我为了实现这一点,每个QueueConnector对象在构造时创建一个新线程,执行一个无限循环,并使用阻塞调用queue.Receive()来接收队列中的下一条消息,如下面的代码所示:
// Instantiate message pump thread
msmqPumpThread = new Thread(() => while (true)
{
   // Blocking call (infinite timeout)
   // Wait for a new message to come in queue and get it
   var message = queue.Receive();

   // Deserialize/Dispatch message
   DeserializeAndDispatchMessage(message);
}).Start();

我想知道是否可以使用任务(Task)代替在新线程上进行无限循环来替换“消息泵”(message pump)。
我已经为接收消息部分创建了一个任务(见下文),但我不知道如何将其用于消息泵(我能否在完成后一遍又一遍地调用相同的任务,并使用延续,以替换在单独线程中的无限循环,就像上面的代码一样?)
Task<Message> GetMessageFromQueueAsync()
{
    var tcs = new TaskCompletionSource<Message>();

    ReceiveCompletedEventHandler receiveCompletedHandler = null;

    receiveCompletedHandler = (s, e) =>
    {
       queue.ReceiveCompleted -= receiveCompletedHandler;
       tcs.SetResult(e.Message);
    };

    queue.BeginReceive();

    return tcs.Task;
}

在这种情况下,使用任务而不是在单独的线程中使用无限循环(带有阻塞调用=>阻塞线程)是否会带来任何好处?如果有的话,如何正确地执行?请注意,此应用程序没有大量的QueueConnector对象,并且不会有(最多可能有10个连接器),因此通过第一种解决方案最多只会有十个线程,因此内存占用/性能启动线程在这里不是问题。我更关心的是调度性能/ CPU使用率。是否会有任何区别?


4
这是一个典型的生产者-消费者场景,可以使用 BlockingCollection 来解决,尤其是使用 BlockingCollection<T>.GetConsumingEnumerable 方法。 - Paolo Moretti
谢谢Paolo! 然而据我所知,生产者/消费者队列更适用于计算密集型任务(执行密集计算),而TaskCompletionSource/异步函数更适用于I/O绑定任务(等待某些事情发生)。 由于我的问题涉及I/O绑定任务(等待消息进入队列),我认为TaskCompletionSource可能更合适。但是我可能错了。 - darkey
1个回答

0
通常情况下,当线程数量较少时,异步代码会产生更多的开销和较低的吞吐量。非阻塞代码在线程数量非常高时最有用,这会导致 a) 由于堆栈而浪费大量内存, b) 上下文切换。然而,它也会产生明显的开销,因为需要更多的分配、更多的间接引用和更多的用户-内核转换。
对于线程数量较少(< 100)的情况,你可能不需要担心。尽量关注编写易维护、抗错性强且简单的代码。使用线程。

感谢您的回答。如果每个QueueConnector对象使用一个专用线程来接收消息(我的当前方法),那么这些线程每次到达阻塞的queue.Receive()调用时都会进行上下文切换。一旦消息到达,线程又会进行另一个上下文切换。这样正确吗? 我已经了解到,任务可以使用线程池来减少启动延迟(在我的情况下不需要),但是通过TaskCompletionSource,它们可以使用回调方法,避免在等待I/O绑定操作时使用线程(意味着没有上下文切换?) - darkey
异步非阻塞代码可以减少上下文切换。如果您想知道哪个版本更快,您需要尝试一下,因为有理由支持两者。我的观点是,这可能不值得麻烦。每秒有多少队列操作?小于10k吗?从性能角度来看,这就不重要了。 - usr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接