在我的应用程序中,我需要监听多个不同的队列并反序列化/分派接收到的队列消息。实际上,我为了实现这一点,每个QueueConnector对象在构造时创建一个新线程,执行一个无限循环,并使用阻塞调用queue.Receive()来接收队列中的下一条消息,如下面的代码所示:
我想知道是否可以使用任务(Task)代替在新线程上进行无限循环来替换“消息泵”(message pump)。
我已经为接收消息部分创建了一个任务(见下文),但我不知道如何将其用于消息泵(我能否在完成后一遍又一遍地调用相同的任务,并使用延续,以替换在单独线程中的无限循环,就像上面的代码一样?)
// Instantiate message pump thread
msmqPumpThread = new Thread(() => while (true)
{
// Blocking call (infinite timeout)
// Wait for a new message to come in queue and get it
var message = queue.Receive();
// Deserialize/Dispatch message
DeserializeAndDispatchMessage(message);
}).Start();
我想知道是否可以使用任务(Task)代替在新线程上进行无限循环来替换“消息泵”(message pump)。
我已经为接收消息部分创建了一个任务(见下文),但我不知道如何将其用于消息泵(我能否在完成后一遍又一遍地调用相同的任务,并使用延续,以替换在单独线程中的无限循环,就像上面的代码一样?)
Task<Message> GetMessageFromQueueAsync()
{
var tcs = new TaskCompletionSource<Message>();
ReceiveCompletedEventHandler receiveCompletedHandler = null;
receiveCompletedHandler = (s, e) =>
{
queue.ReceiveCompleted -= receiveCompletedHandler;
tcs.SetResult(e.Message);
};
queue.BeginReceive();
return tcs.Task;
}
在这种情况下,使用任务而不是在单独的线程中使用无限循环(带有阻塞调用=>阻塞线程)是否会带来任何好处?如果有的话,如何正确地执行?请注意,此应用程序没有大量的QueueConnector对象,并且不会有(最多可能有10个连接器),因此通过第一种解决方案最多只会有十个线程,因此内存占用/性能启动线程在这里不是问题。我更关心的是调度性能/ CPU使用率。是否会有任何区别?
BlockingCollection
来解决,尤其是使用BlockingCollection<T>.GetConsumingEnumerable
方法。 - Paolo Moretti