在C#中进行并行树遍历

14
我需要快速遍历一棵树,而且我想要并行地进行。我宁愿使用并行扩展而不是手动启动一堆线程。
我的当前代码大致如下:
public void Traverse(Node root)
{
    var nodeQueue = new Queue<Node>();
    nodeQueue.Enqueue(root);
    while (nodeQueue.Count != 0)
    {
        var node = nodeQueue.Dequeue();
        if (node.Property == someValue) DoSomething(node);
        foreach (var node in node.Children)
        {
            nodeQueue.Enqueue(node);
        }
    }
}

我真的希望Parallel.ForEach有一个类似于Parallel.While的模拟。我在Stephen Toub的文章Implementing Parallel While with Parallel.ForEach中找到了这个方法。如果我理解正确的话,这仍然不起作用,因为我正在改变我试图迭代的队列。
我需要使用任务工厂和递归吗(这样做是否有风险?)?还是我忽视了一些简单的解决方案?
@svick:
树上有超过250,000个节点。 目前的最大深度是包括根节点在内的14个节点。
根节点之后有大约500个节点,其余节点的分布相对随机。 我很快会提供一些更好的分布统计数据。
@Enigmativity:
是的,这棵树正在被多个用户并发修改,但通常我会为树或子树使用共享读锁,或者允许脏读取。
调用node.Children可以被视为原子操作。

DoSomething真的只是几个委托之一,对于一些昂贵的操作,我可能会收集一个节点的快照列表,并在遍历之外进行处理。

我意识到我应该看一下一般情况(而不是整个树被遍历的情况)。为此,我对树的每个节点运行了遍历,并查看了总时间。

我使用了Parallel.ForEach(nodes, Traverse)来执行每个遍历算法,其中nodes包含了大约250k个节点。这模拟了很多用户同时请求很多不同的节点。

00256ms 广度优先顺序遍历

00323ms 带有工作的广度优先顺序遍历(我递增了一个静态计数器作为“工作”)

01495ms Kirks第一个答案

01143ms Svicks第二个答案

00000ms 递归单线程在60秒后未完成

00000ms Enigmativity的答案在60秒后未完成

@Enigmativity,我觉得我可能在某种程度上搞乱了你的算法,因为它似乎应该更快。

结果让我感到非常惊讶。 为了确信编译器没有神奇地优化遍历过程,我不得不在广度优先顺序中添加一些工作。
对于头部的单次遍历,只有并行化第一层才能获得最佳性能。但是仅仅是勉强达到这个数值,当我将第二层的节点数量增加到2000(而不是500)时,这个数值有所改善。

1
我不确定并行化这个程序是一个好主意。如果你在一开始就知道需要多少次迭代,那么并行化是有意义的。但在这里,直到访问子节点之前,你不一定知道需要多少次迭代。在我看来,一个简单的生产者/消费者模型就足够了。 - Jeff Mercado
@Jeff,为什么只有在您确切知道迭代次数时并行化才有意义? - svick
@svick:当你开始遍历一棵树时,你只能访问一个节点,即根节点。在实际访问节点之前,你不知道也无法访问其他节点。所以对我来说,想要并行遍历单个节点是很奇怪的。对我来说,更自然的做法是列出要访问的节点列表。当访问一个节点时,将其子节点添加到列表中,并继续进行,直到所有在列表中找到的节点都被访问过为止。这就是生产者/消费者模式对我有意义的地方,其中所有线程都是生产者和消费者。 - Jeff Mercado
@Jeff,显然你不能并行遍历一个节点,但是遍历整个树是有意义的。如果你的意思是你将有多个生产者/消费者线程,那么这是一种并行化代码的方法。 - svick
抱歉,我觉得我可能表达不清楚。我不打算使用Parallel.ForEach(),我提到希望有一个类似的Parallel.While()。想象一下我看到Parallel.Do()时的短暂快乐 ;_) - Jason Hernandez
显示剩余4条评论
7个回答

7
最直接的方法是为每个子节点创建一个Task,然后等待它们全部完成:
public void Traverse(Node root)
{
    if (node.Property == someValue)
        DoSomething(node);

    var tasks = new List<Task>();

    foreach (var node in node.Children)
    {
        // tmp is necessary because of the way closures close over loop variables
        var tmp = node;
        tasks.Add(Task.Factory.StartNew(() => Traverse(tmp)));
    }

    Task.WaitAll(tasks.ToArray());
}
Task 是一个相对较轻的任务,因此创建大量的任务可以得到合理的效果。但是它们确实有一些开销,所以做一些更复杂的事情,比如让几个任务共享一个队列,可能会更快。如果您选择这种方式,请不要忘记,空队列并不意味着所有的工作都已完成。如果您采用这种方式,System.Collections.Concurrent 命名空间中的类将非常有用。 编辑: 由于树的形状(根节点大约有500个子节点),仅并行处理第一层应该可以获得良好的性能:
public void Traverse(Node root, bool parallel = true)
{
    if (node.Property == someValue)
        DoSomething(node);

    if (parallel)
    {
        Parallel.ForEach(node.Children, node =>
        {
            Traverse(node, false);
        });
    }
    else
    {
        foreach (var node in node.Children)
        {
            Traverse(node, false);
        }
    }
}

2
我尝试了这个方法,但不幸的是它的表现比我之前提问中的单线程代码更差... 它需要大约20%更多的CPU时间,而且令人惊讶的是,需要300%更多的时钟时间。我在ANTS分析器中测量了两者,并进行了额外的秒表测试,以确保分析器本身没有导致减速。 - Jason Hernandez
另外一件事,System.Collections.Concurrent非常有用,我也会在某些地方加入一些FSharpSet。 - Jason Hernandez

3
我可能漏掉了什么,但我根本不觉得需要使用whilewhile只是确保您遍历每个节点。
相反,只需为树中的每个节点递归调用函数即可。
public void Traverse(Node root)
{         
    if (root.Property = someValue) DoSomething(node);    
    Parallel.ForEach<Node>(root.Children, node => Traverse(node));
} 

编辑:当然,如果您喜欢水平处理而不是垂直处理,并且您的昂贵操作是DoSomething,则可以先进行遍历

public IEnumerable<Node> Traverse(Node root)
{
    // return all the nodes on this level first, before recurring
    foreach (var node in root.Children)
    {
        if (node.Property == someValue)
            yield return node;
    }

    // next check children of each node
    foreach (var node in root.Children)
    {
        var children = Traverse(node);
        foreach (var child in children)
        {
            yield return child;
        }
    }
}

Parallel.ForEach<Node>(Traverse(n), n => DoSomething(n));

看起来很简单,但有很多嵌套的Parallel.Foreach。注意使用队列遍历树,使其横向而不是递归纵向;这可能会改变逻辑(但在这种情况下,只有在DoSomething存在副作用时才会改变)。 - FastAl
我认为这以一种简洁、简单的方式解决了这个“问题”。 - Kirk Broadhurst
但是我为水平遍历添加了另一种解决方案,只是为了好玩。 - Kirk Broadhurst

2
由于树的遍历非常快,调用Children是原子性的,并且需要并行执行的DoSomething委托的昂贵性质,这是我对解决方案的看法。
我从需要一个以节点为参数的函数开始,创建一个执行DoSomething的任务,递归调用自身以创建所有子节点的任务,最后返回等待所有内部任务完成的任务。
代码如下:
Func<Node, Task> createTask = null;
createTask = n =>
{
    var nt = Task.Factory.StartNew(() =>
    {
        if (n.Property == someValue)
            DoSomething(n);
    });
    var nts = (new [] { nt, })
        .Concat(n.Children.Select(cn => createTask(cn)))
        .ToArray();

    return Task.Factory.ContinueWhenAll(nts, ts => { });
};

只需要调用它并等待遍历完成即可:
createTask(root).Wait();

我通过创建一个根节点下有500个子节点,14层深度,每个节点有1或2个子节点的节点树来进行测试。这使我总共拥有319,501个节点。
我创建了一个DoSomething方法来执行一些工作 - for (var i = 0; i < 100000 ; i++) { }; - 然后运行上面的代码并将其与串行处理相同的树进行比较。
并行版本花费了5,151毫秒,而顺序版本则需要13,746毫秒。
我还进行了一个测试,将节点数减少到3,196,并将DoSomething的处理时间增加了100倍。如果任务很快完成,TPL会非常聪明地回归到顺序运行,因此延长处理时间会使代码更具并行性。
现在并行版本需要3,203毫秒。顺序版本需要11,581毫秒。如果我只调用createTask(root)函数而不等待它完成,那么它只需要126毫秒。这意味着树的遍历非常快,然后在处理时锁定树,在遍历时解锁树是有意义的。
希望这能帮助到您。

只是出于好奇,你是用广度优先的单线程算法进行比较,还是递归算法? - Jason Hernandez
@Jason - 我使用了递归、广度优先、单线程算法。你为什么认为它只能是其中之一而不是两者兼备呢? - Enigmativity
在我的测试中,单线程递归函数非常慢。我没有看到你所看到的速度提升,所以我想知道你将算法与什么进行了比较。 - Jason Hernandez
@Jason - 我测试了我的算法以及其他在这里发布的答案。一旦我通过让DoSomething花费足够长的时间来平衡TPL异步执行它,我发现每个人的答案都差不多,包括我的答案。对于构建TPL的人来说,这是一个大的成功——它确实在性能和灵活性方面非常出色。 - Enigmativity

1
假设您有p个处理器,您可以使用p个分区对root.Children进行Parallel.For。每个分区都将在子树上执行传统的单线程遍历、比较操作,但不是执行DoSomething,而是将一个委托加入到并发队列中,以便稍后执行该操作。如果分配基本上是随机和平衡的,并且由于遍历只执行遍历/加入委托操作,因此这部分时间为1/p。此外,遍历可能会在所有的DoSomethings执行之前耗尽自身,所以您可以拥有p个消费者(DoSomething执行者),从而实现最大并行执行,假设这些操作都是独立的。
通过将根节点子项数目进行简单的划分,并将其随机分配给子树,遍历本身将变得非常快速。通过将消费者粗略地分配到每个处理器上,您还可以获得最大并行 DoSomething 操作。

1
人们经常会过度复杂化这种问题,使用命令式的代码风格,导致所有复杂的工作都在递归函数内部完成。可以预见的是,遍历节点非常快速,您希望从调用DoSomething中获得并行价值,对吗?
以下是一种简单且通用的方法,可用于从任何类似树形结构中获取每个节点:
public IEnumerable<T> GetSelfAndDescendants<T>(T root, Func<T, IEnumerable<T>> getChildren)
{
    yield return root;
    foreach (var descendants in getChildren(root).SelectMany(child => GetSelfAndDescendants(child, getChildren)))
    {
        yield return descendants;
    }
}

有了这个工具,你可以做出类似于这样的事情:

GetSelfAndDescendants(rootNode, n => n.Children)
    .Where(node => node.Property == someValue)
    .AsParallel()
    .ForAll(DoSomething);

这将遍历树的问题与处理其节点的问题分开,这意味着递归部分的过程不必使并行部分复杂化。

此外,由于现在并行库只有一个入口点(对AsParallel()的调用仅发生一次),因此您可以更多地控制同时执行的任务数量。这很重要,因为如果您启动了一万个并行线程,它们都只会争夺资源,并不会真正加快完成速度。我怀疑这就是其他答案性能差的原因:它们在每个递归级别上都会启动新线程,因此即使每个Parallel.ForEach都限制并发性,也会有数十个或数百个它们同时被调用。

使用我详细说明的方法,您可以让.NET为您选择合理的默认值,或者您可以调用.WithDegreeOfParallelism(...)来尝试不同数量的并发任务。

根据我的测试用例,使用DoSomething只运行一个未经优化的for (var i = 0; i < 100000 ; i++) { },这需要的时间大约是相同集合上简单的foreach的十分之一。

我应该指出,如果DoSomething不适合并行处理(例如它在共享资源上锁定),您可能仍然会发现串行处理项目更快。

DoSomething实际上是几个委托之一,对于一些昂贵的操作,我可能会收集节点的快照列表并在遍历之外处理它们。

这对于该用例很有效:只需在适当的位置添加.ToList()以捕获节点即可。

更新

假设,遍历节点非常快,你希望从并行调用DoSomething中获得价值,对吧?

不是。在这种情况下,我正在尝试优化实际的遍历。这是针对一个近乎实时处理大量数据的系统,我们试图保持整个系统调用而不仅仅是遍历在200ms以下。

在这种情况下,按照svick的建议仅并行化树的顶级子节点可能是合理的。如果DoSomething()几乎什么也不做,那么从调用此方法开始可以显著提高性能:

public void Hybrid(Node root)
{
    if (root.Property == someValue) DoSomething(root);
    Parallel.ForEach(root.Children, Traverse);
}

基准测试结果

基准测试: http://share.linqpad.net/gpc4pc.linq

当然,这假定你可以真正从多核中获益(即你的系统没有同时处理一堆其他操作)。如果遍历树是你的性能瓶颈,并且你发现自己经常遍历树,则应考虑是否可以使用不同的数据结构和算法更好地解决问题。


你可能认为遍历节点非常快,希望通过并行调用DoSomething来获得价值,对吗?不是的。在这种情况下,我试图优化的是实际的遍历过程。这是一个处理大量数据的系统,需要近乎实时地完成操作,并且我们试图保持整个系统调用时间低于200毫秒,而不仅仅是遍历时间。 - Jason Hernandez

1
以下是`Traverse`方法的两个并行版本。第一个是最简单的版本。`ParallelTraverse`为根节点及其后代(子节点、子节点的子节点等)调用同步委托。该迭代器使用Stack<T>而不是Queue<T>,以提高内存效率。`Stack<T>`每次只包含少量节点,相反,Queue<T>将存储超过节点总数一半的数量。关于树的各种遍历方式,您可以查看this问题。
/// <summary>
/// Invokes a delegate for a node and all its descendants in parallel.
/// </summary>
public static ParallelLoopResult ParallelTraverse<TNode>(
    TNode root,
    ParallelOptions parallelOptions,
    Action<TNode, ParallelLoopState> body,
    Func<TNode, IReadOnlyList<TNode>> childrenSelector)
{
    ArgumentNullException.ThrowIfNull(parallelOptions);
    ArgumentNullException.ThrowIfNull(body);
    ArgumentNullException.ThrowIfNull(childrenSelector);
    IEnumerable<TNode> Iterator()
    {
        Stack<TNode> stack = new();
        stack.Push(root);
        while (stack.Count > 0)
        {
            TNode node = stack.Pop();
            yield return node;
            IReadOnlyList<TNode> children = childrenSelector(node);
            if (children is null) continue;
            for (int i = children.Count - 1; i >= 0; i--)
                stack.Push(children[i]);
        }
    }
    return Parallel.ForEach(Iterator(), parallelOptions, body);
}

用法示例:

ParallelOptions options = new() { MaxDegreeOfParallelism = Environment.ProcessorCount };
ParallelTraverse(root, options, (node, state) =>
{
    if (node.Property == someValue) DoSomething(node);
}, node => node.Children);

仅有body被并行处理。 childrenSelector按顺序调用(一次一个节点)。
第二个版本具有相同的签名和功能,只在行为上有所不同。 ParallelTraverseHierarchical确保在父节点的body完成之前不会调用所有节点的body。例如,在将节点保存到数据库中,并且保存节点需要其父节点的自动生成ID的情况下,这可能很重要。
/// <summary>
/// Invokes a delegate for a node and all its descendants in parallel.
/// Children are invoked after the completion of their parent.
/// </summary>
public static ParallelLoopResult ParallelTraverseHierarchical<TNode>(
    TNode root,
    ParallelOptions parallelOptions,
    Action<TNode, ParallelLoopState> body,
    Func<TNode, IReadOnlyList<TNode>> childrenSelector)
{
    ArgumentNullException.ThrowIfNull(parallelOptions);
    ArgumentNullException.ThrowIfNull(body);
    ArgumentNullException.ThrowIfNull(childrenSelector);
    using BlockingCollection<TNode> stack = new(new ConcurrentStack<TNode>());
    stack.Add(root);
    int pendingCount = stack.Count;

    Partitioner<TNode> partitioner = Partitioner.Create(
        stack.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);

    return Parallel.ForEach(partitioner, parallelOptions, (node, state) =>
    {
        try
        {
            body(node, state);
            if (state.ShouldExitCurrentIteration) return;
            IReadOnlyList<TNode> children = childrenSelector(node);
            if (children is null) return;
            Interlocked.Add(ref pendingCount, children.Count);
            // Ideally the children should be added all at once as an atomic
            // operation, but although the ConcurrentStack does have an atomic
            // PushRange method, the BlockingCollection doesn't expose this feature.
            for (int i = children.Count - 1; i >= 0; i--)
                stack.Add(children[i]);
        }
        finally
        {
            if (Interlocked.Decrement(ref pendingCount) == 0)
                stack.CompleteAdding();
        }
    });
}

这个版本同时并行处理bodychildrenSelector。其中一个委托应该是粗重的。如果两者都是轻量级的(在微秒级别),通过BlockingCollection<T>传递每个单独的节点的开销可能会抵消并行化的好处。我在这里上传了一个更复杂的ParallelTraverseHierarchical方法实现,基于一堆堆栈,可以每秒遍历数百万个节点。
上述情况中,在数据库中保存一棵树需要满足以下条件:TNode类型必须具有可变的数据库Id属性,并且必须引用其父节点。这样,每个节点都能够检索到其父节点的自动生成ID。如果无法满足此要求,可以通过将body参数的类型从Action更改为Func来增强上述实现。TArg将是一个泛型参数,用于向子节点传递关于其父节点的信息。这些信息可以通过阻塞集合BlockingCollection<(TNode, TArg)>进行传递。对于根节点root,TArg可以作为额外的参数传递给方法。

1
我喜欢你的实现避免了递归函数,从而也避免了对于非常高的树而言的堆栈溢出问题。 - StriplingWarrior

0
也许使用列表或数组而不是队列会更有帮助。另外,使用另一个列表/数组来填充下一个要访问的节点。无论如何,在完成整个宽度之前,您都不会处理该列表。可以尝试以下代码:
List<Node> todoList = new List<Node>();
todoList.Add(node);
while (todoList.Count > 0)
{
    // we'll be adding next nodes to process to this list so it needs to be thread-safe
    // or just sync access to a non-threadsafe list
    // if you know approx how many nodes you expect, you can pre-size the list
    ThreadSafeList<Node> nextList = new ThreadSafeList<Node>();  

    //todoList is readonly/static so can cache Count in simple variable
    int maxIndex  =  todoList.Count-1;
    // process todoList in parallel
    Parallel.For(0, maxIndex, i =>
    {
        // if list reads are thread-safe then no need to sync, otherwise sync
        Node x = todoList[i];

        //process x;
        // e.g. do somehting, get childrenNodesToWorkOnNext, etc.

        // add any child nodes that need to be processed next
        // e.g. nextList.add(childrenNodesToWorkOnNext);
    });

   // done with parallel processing by here so use the next todo list
   todoList = nextList;
)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接