编辑:事实证明我错了。即使配置为并行处理,
TransformBlock
仍然按照它们进入的顺序返回项目。因此,我原始答案中的代码完全无用,可以使用普通的
TransformBlock
代替。
原始答案:
据我所知,在.Net中只有一个并行性构造支持按照它们进入的顺序返回处理过的项目:带有
AsOrdered()
的PLINQ。但是我认为PLINQ不适合您想要的东西。
另一方面,TPL Dataflow非常适合,但它没有支持同时支持并行性和按顺序返回项的块(
TransformBlock
支持这两个功能,但不能同时使用)。幸运的是,Dataflow块是以组合性为设计思路的,因此我们可以构建自己的块来实现这一点。
但首先,我们必须弄清楚如何对结果进行排序。像您建议的那样使用并发字典以及一些同步机制肯定会起作用。但是我认为有一个更简单的解决方案:使用一个
Task
队列。在输出任务中,您出队一个
Task
,等待它完成(异步地),并在完成后发送其结果。对于队列为空的情况,我们仍然需要一些同步,但是如果聪明地选择要使用的队列,我们可以免费获得它。
因此,总体思路如下:我们正在编写一个
IPropagatorBlock
,具有一些输入和一些输出。创建自定义
IPropagatorBlock
的最简单方法是创建一个处理输入的块,另一个生成结果的块,并将它们视为一个块,使用
DataflowBlock.Encapsulate()
进行处理。
输入块必须按正确的顺序处理传入的项目,因此不会并行化。它将创建一个新的
Task
(实际上是一个
TaskCompletionSource
,以便稍后设置
Task
的结果),将其添加到队列中,然后发送要处理的项目,以及一种设置正确的
Task
结果的方法。因为我们不需要将此块链接到任何东西,所以可以使用
ActionBlock
。
输出块将需要从队列中获取任务,异步等待它们,然后将它们发送出去。但由于所有块都嵌入了队列,并且具有委托的块内置了异步等待,因此这将非常简单:
new TransformBlock<Task<TOutput>, TOutput>(t => t)
。该块将同时作为队列和输出块工作。因此,我们不必处理任何同步问题。
最后一部分拼图实际上是并行处理项目。为此,我们可以使用另一个
ActionBlock
,这次设置了
MaxDegreeOfParallelism
。它将接收输入,处理它,并将结果设置为队列中正确的
Task
。
组合起来,它可能看起来像这样:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
经过这么多的讨论,我认为这是相当少量的代码。
看起来你非常关心性能,因此您可能需要优化此代码。例如,将processor
块的MaxDegreeOfParallelism
设置为类似于Environment.ProcessorCount
的值,以避免超额订阅。此外,如果对您而言延迟比吞吐量更重要,则可以将同一块的MaxMessagesPerTask
设置为1(或另一个较小的数字),以便在完成项目处理时立即将其发送到输出。
此外,如果您想限制传入的项,可以将enqueuer
的BoundedCapacity
设置为适当的值。
TransformBlock
无法同时并行处理项目并按顺序返回它们。它可以执行其中一个操作,但不能同时执行两个操作。 - svickLinkTo()
,而是在enqueuer
中两次使用了SendAsync()
。我可以将enqueuer
变成一个TransformBlock
并使用一个LinkTo()
和一个SendAsync()
,但我更喜欢这种方式,因为它“对称”。ActionBlock
通常是一个终点,但其中的代码可以做任何你想做的事情,包括将项发送到其他块,这就是我所做的。 - svickActionBlock
(enqueuer
),然后是另一个块,它是TransformBlock
(queue
),最后是另一个块,它是ActionBlock
(processor
)。 - svick