以下是`Traverse`方法的两个并行版本。第一个是最简单的版本。`ParallelTraverse`为根节点及其后代(子节点、子节点的子节点等)调用同步委托。该迭代器使用
Stack<T>
而不是
Queue<T>
,以提高内存效率。`Stack<T>`每次只包含少量节点,相反,
Queue<T>
将存储超过节点总数一半的数量。关于树的各种遍历方式,您可以查看
this问题。
public static ParallelLoopResult ParallelTraverse<TNode>(
TNode root,
ParallelOptions parallelOptions,
Action<TNode, ParallelLoopState> body,
Func<TNode, IReadOnlyList<TNode>> childrenSelector)
{
ArgumentNullException.ThrowIfNull(parallelOptions);
ArgumentNullException.ThrowIfNull(body);
ArgumentNullException.ThrowIfNull(childrenSelector);
IEnumerable<TNode> Iterator()
{
Stack<TNode> stack = new();
stack.Push(root);
while (stack.Count > 0)
{
TNode node = stack.Pop();
yield return node;
IReadOnlyList<TNode> children = childrenSelector(node);
if (children is null) continue;
for (int i = children.Count - 1; i >= 0; i--)
stack.Push(children[i]);
}
}
return Parallel.ForEach(Iterator(), parallelOptions, body);
}
用法示例:
ParallelOptions options = new() { MaxDegreeOfParallelism = Environment.ProcessorCount };
ParallelTraverse(root, options, (node, state) =>
{
if (node.Property == someValue) DoSomething(node);
}, node => node.Children);
仅有
body
被并行处理。
childrenSelector
按顺序调用(一次一个节点)。
第二个版本具有相同的签名和功能,只在行为上有所不同。
ParallelTraverseHierarchical
确保在父节点的
body
完成之前不会调用所有节点的
body
。例如,在将节点保存到数据库中,并且保存节点需要其父节点的自动生成
ID
的情况下,这可能很重要。
public static ParallelLoopResult ParallelTraverseHierarchical<TNode>(
TNode root,
ParallelOptions parallelOptions,
Action<TNode, ParallelLoopState> body,
Func<TNode, IReadOnlyList<TNode>> childrenSelector)
{
ArgumentNullException.ThrowIfNull(parallelOptions);
ArgumentNullException.ThrowIfNull(body);
ArgumentNullException.ThrowIfNull(childrenSelector);
using BlockingCollection<TNode> stack = new(new ConcurrentStack<TNode>());
stack.Add(root);
int pendingCount = stack.Count;
Partitioner<TNode> partitioner = Partitioner.Create(
stack.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
return Parallel.ForEach(partitioner, parallelOptions, (node, state) =>
{
try
{
body(node, state);
if (state.ShouldExitCurrentIteration) return;
IReadOnlyList<TNode> children = childrenSelector(node);
if (children is null) return;
Interlocked.Add(ref pendingCount, children.Count);
for (int i = children.Count - 1; i >= 0; i--)
stack.Add(children[i]);
}
finally
{
if (Interlocked.Decrement(ref pendingCount) == 0)
stack.CompleteAdding();
}
});
}
这个版本同时并行处理
body
和
childrenSelector
。其中一个委托应该是粗重的。如果两者都是轻量级的(在微秒级别),通过
BlockingCollection<T>
传递每个单独的节点的开销可能会抵消并行化的好处。我在
这里上传了一个更复杂的
ParallelTraverseHierarchical
方法实现,基于一堆堆栈,可以每秒遍历数百万个节点。
上述情况中,在数据库中保存一棵树需要满足以下条件:TNode类型必须具有可变的数据库Id属性,并且必须引用其父节点。这样,每个节点都能够检索到其父节点的自动生成ID。如果无法满足此要求,可以通过将body参数的类型从Action更改为Func来增强上述实现。TArg将是一个泛型参数,用于向子节点传递关于其父节点的信息。这些信息可以通过阻塞集合BlockingCollection<(TNode, TArg)>进行传递。对于根节点root,TArg可以作为额外的参数传递给方法。