您所描述的任务可以很好地适用于
TPL Dataflow库,这是一个
TPL
的小附加组件(它可以通过
nuget包包含在项目中,支持.NET 4.5),您只需轻松引入类似以下的流程即可(根据
BroadcastBlock
的评论更新了代码):
var buffer = new BroadcastBlock<string>();
var consumer1 = new TransformBlock<string, string>(s => { });
var consumer2 = new TransformBlock<string, string>(s => { });
var resultsProcessor = new ActionBlock<string>(s => { });
我不确定你的解决方案逻辑,所以我认为你只是在这里操作字符串。你应该异步发送所有传入的数据作为第一个块(如果你Post
你的数据,如果缓冲区过载,消息将被丢弃),并在它们之间链接块,就像这样:
buffer.LinkTo(consumer1, new DataflowLinkOptions { PropagateCompletion = true });
buffer.LinkTo(consumer2, new DataflowLinkOptions { PropagateCompletion = true });
consumer1.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });
consumer2.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var s in IncomingData)
{
await buffer.SendAsync(s);
}
buffer.Complete();
如果你的消费者都需要处理
所有项目,那么你应该使用
BroadcastBlock
(可能会出现一些
有关保证传递的问题),另一个选项是通过消费者过滤消息(也许通过消息ID除以消费者数量的余数),但在这种情况下,你应该链接到另一个消费者,该消费者将“捕获”由于某种原因未被消费的所有消息。
如你所见,块之间的链接是完全传播的,因此在此之后,你可以简单地附加到
.Completion
任务属性以获取
resultsProcessor
:
resultsProcessor.Completion.ContinueWith(t => { /* Processing is complete */ })
TPL Dataflow NuGet Package
(https://www.nuget.org/packages/Microsoft.Tpl.Dataflow)吗? - VMAtm