消息队列和异步/等待

11

我只想以异步方法接收我的消息!但这会导致我的用户界面冻结。

    public async void ProcessMessages()
    {
        MessageQueue MyMessageQueue = new MessageQueue(@".\private$\MyTransactionalQueue");
        MyMessageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

        while (true)
        {
            MessageQueueTransaction MessageQueueTransaction = new MessageQueueTransaction();
            MessageQueueTransaction.Begin();

            ContainError = false;
            ProcessPanel.SetWaiting();

            string Body = MyMessageQueue.Receive(MessageQueueTransaction).Body.ToString();

            //Do some process with body string.

            MessageQueueTransaction.Commit();
        }
    }

我就像调用任何常规方法一样调用这个方法,但它不起作用!当我使用BackgroundWorkers而不是async/await时,这段代码曾经有效。

有什么建议吗?


2
你的代码是否产生了任何警告?你有阅读并尝试理解它们所说的内容吗? - svick
在您的错误列表中查看,您会看到“此异步方法缺少'await'运算符,将以同步方式运行”。 - Peter Ritchie
1
这不是一个本地化的问题 - 实际上,在SO中有许多类似的问题,人们认为 async 会使他们的方法异步运行。 - Panagiotis Kanavos
2个回答

27

正如Stephen所写的那样,异步不会在线程中运行你的代码。好在,你可以使用TaskFactory.FromAsyncMessageQueue.BeginReceive/MessageQueue.EndReceive来异步接收消息:

    private  async Task<Message> MyAsyncReceive()
    {
        MessageQueue queue=new MessageQueue();
        ...
        var message=await Task.Factory.FromAsync<Message>(
                           queue.BeginReceive(),
                           queue.EndReceive);

        return message;

    }

请注意,BeginReceive没有使用事务的版本。根据BeginReceive的文档:
不要使用异步调用BeginReceive与事务一起使用。如果您想执行事务性异步操作,请调用BeginPeek,并将事务和(同步)Receive方法放在为peek操作创建的事件处理程序中。
这是有道理的,因为无法保证需要等待多长时间才能获得响应或最终处理已完成调用的线程是哪个。
要使用事务,您应编写类似以下内容的代码:
    private  async Task<Message> MyAsyncReceive()
    {
        var queue=new MessageQueue();

        var message=await Task.Factory.FromAsync<Message>(queue.BeginPeek(),queue.EndPeek);

        using (var tx = new MessageQueueTransaction())
        {
            tx.Begin();

            //Someone may have taken the last message, don't wait forever
            //Use a smaller timeout if the queue is local
            message=queue.Receive(TimeSpan.FromSeconds(1), tx);
            //Process the results inside a transaction
            tx.Commit();
        }
        return message;
    }

更新

正如Rob所指出的那样,原始代码使用了从Peek返回的message,这可能在PeekReceive之间发生了更改。在这种情况下,第二个消息将丢失。

然而,仍有可能会阻塞,如果另一个客户端读取了队列中的最后一条消息。为了防止这种情况发生,Receive应该设置一个较小的超时时间。


1
这个回答很棒!并且它可行!如果我想多次运行这个 MyAsyncReceive 方法,它们会共享我的线程吗?那它们会变得慢吗? - Fraga
1
这个方法并没有什么特别的。运行时会从线程池中使用一个可用的线程来执行每次调用。 - Panagiotis Kanavos
这段代码中是否包含竞争条件呢?如果存在多个消费者,那么 message 变量可能会包含与接收到的不同的消息,是吗? Peek 方法将返回 一个 消息,但 Receive() 方法可能会接收到不同的消息。我建议在“Peek awaiter”中删除 var message = 并使用 var message = queue.Receive(tx); 来确保正确的消息在事务中被处理。 - RobIII
交易与第一个示例相比,到底有哪些好处? - user1713059
@user1713059 交易是队列处理的重要组成部分。整个模型基于这样一种假设:如果出现问题,例如队列处理程序崩溃,则消息将被放回队列中。好的消息最终会重新处理。坏的消息则会进入死信队列。云服务提供商用租赁替代了事务处理,这更具可扩展性,但会导致重试速度变慢。 - Panagiotis Kanavos
显示剩余2条评论

6

async 不会在后台线程上运行您的代码。 您上面的代码应该会导致编译器警告,告诉您该方法将同步运行。

如果要在后台线程上执行方法,请使用 TaskEx.Run

public void ProcessMessages()
{
  ...
}

TaskEx.Run(() => ProcessMessages());

如果我使用run,它会创建一个新的线程!因此,我无法直接修改UI,我需要使用Dispatch对吗?就像普通线程一样?还是有其他方法?我只需要更新主窗体中的进度条。 - Fraga
我建议使用 IProgress<T>Progress<T>,它们可以为您处理线程调度。TAP文档涵盖了IProgress<T> - Stephen Cleary
我无法将两个答案标记为正确!我的问题的答案是使用Task.Factory.FromAsync,但这种方式不会利用新线程的优势。因此,我将使用Run!谢谢。 - Fraga

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接