数据流 (任务并行库) 和异步等待

4
假设我在.NET中使用Dataflow块。它被说明是“此数据流模型促进了基于Actor的编程”,这正是我想要实现的。
然而,如果我从一个BufferBlock<T>处理消息,并且在消息处理器中决定使用async/await,那么这将在当前线程和等待任务的工作线程中分叉执行。
有没有办法在此处防止Actor/消息处理器内的并发执行?
如果等待的任务使用本地回调进行非阻塞IO操作执行,那就没问题了。但我真的希望确保任何.NET代码都只能同步执行。

2
有一个具体的例子会更容易解释答案。你能发一下你的代码吗? - Panagiotis Kanavos
有趣的事实:这是Akka.NET非常早期的工作。 - Roger Johansson
2个回答

3
这很大程度上取决于您如何处理这些消息。
如果您使用带有异步操作的 ActionBlock,不设置其 MaxDegreeOfParallelism(即使用默认值1),并将其链接到 BufferBlock,则 action 将一次执行一个消息,没有并行处理。
如果您手动处理消息,可以使用以下循环:
while (await bufferBlock.OutputAvailableAsync())
{
    var message = await bufferBlock.ReceiveAsync();
    await ProcessMessageAsync(message);
}

那么这些消息也将逐个进行处理。

但是,在这两种情况下,这并不意味着一条消息将由单个线程处理。它可能会被多个线程处理,但不是并行处理。因为在使用await之后,执行可以在不同的线程上恢复,而不是在暂停的地方。

如果您使用其他方式处理消息(例如使用上面的循环,但省略ProcessMessageAsync()之前的await),则多个消息可以同时处理。


1
你误解了await的作用。它不会分叉任何东西,只是等待已经异步操作的结果。
使用async关键字标记的方法并不会自动变成异步的。只有在async方法内部遇到异步操作时,执行才会异步进行。 async关键字只告诉编译器异步操作完成后执行应该继续哪里。
在等待期间没有线程池线程被浪费或受损,因此您不应该尝试限制、防止或规避这一点。事实上,使用异步操作可以获得更好的可伸缩性,因为TPL Dataflow使用的ThreadPool线程不会阻塞等待长时间运行的异步操作,如I/O或Web服务调用。

好的,我的措辞有误,让我们假设以下示例:我的“actor”正在处理来自我的缓冲块的消息,在此处理中,我们遇到了一个“await”语句,执行现在返回给调用者,对吧?一旦任务完成,剩余的代码将被执行。这难道不会在等待的任务执行时打开处理器以开始处理新消息的机会吗? - Roger Johansson
不完全正确。ActionBlocks始终异步执行其操作,尊重其执行选项以限制并发任务的数量。如果您进行另一个异步调用,例如HttpClient.GetStringAsync,而没有添加await,则执行将立即继续。如果您添加了await,则只有在GetStringAsync完成后才会继续执行。 - Panagiotis Kanavos
这个不起作用,我已经尝试过了,await块在不同的线程中执行,而不是消息处理器。 - Roger Johansson
这就是TPL的一般工作方式。你为什么认为这是一个问题或者它“不起作用”?操作执行始于一个线程池线程。当您使用await调用异步方法时,它会在另一个线程池线程中继续执行。当它完成后,await后面的代码将在另一个线程中执行。只有当您省略await时,任何混淆才会被引起:在这种情况下,异步和ActionBlock操作都可能同时运行。 - Panagiotis Kanavos

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接