如何加速递归搜索函数?

9
我写的搜索功能速度有问题。以下是该函数的步骤:
  1. 函数开始时需要两个表名参数:起始点和目标点
  2. 然后函数遍历一个包含50,000个元素的表-列组合列表,并检索所有与起始点表相关的组合。
  3. 然后,函数遍历每个检索到的组合,对于每个组合,再次遍历表-列组合列表,但这一次寻找匹配给定列的表。
  4. 最后,函数遍历从上一步返回的每个组合,对于每个组合,检查该表是否与目标表相同;如果是,则保存它,否则调用自身并传入该组合的表名。
函数目的在于能够追踪直接链接或有多级分离的表间链接。递归层数为固定整数值。 我的问题在于,每当我尝试运行该函数的两个搜索深度(现阶段不敢尝试更深),作业就会耗尽内存,或者我会失去耐心。有一次我等了17分钟,作业才因为内存耗尽而停止。 每个表的平均列数为28,标准差为34。
以下是图示,显示了各种表之间可能形成的连接示例: 每个列可能在多个表中有匹配。然后可以逐列搜索具有匹配列的表,以此类推 以下是我的代码:
private void FindLinkingTables(List<TableColumns> sourceList, TableSearchNode parentNode, string targetTable, int maxSearchDepth)
{
    if (parentNode.Level < maxSearchDepth)
    {
        IEnumerable<string> tableColumns = sourceList.Where(x => x.Table.Equals(parentNode.Table)).Select(x => x.Column);

        foreach (string sourceColumn in tableColumns)
        {
            string shortName = sourceColumn.Substring(1);

            IEnumerable<TableSearchNode> tables = sourceList.Where(
                x => x.Column.Substring(1).Equals(shortName) && !x.Table.Equals(parentNode.Table) && !parentNode.Ancenstory.Contains(x.Table)).Select(
                    x => new TableSearchNode { Table = x.Table, Column = x.Column, Level = parentNode.Level + 1 });
            foreach (TableSearchNode table in tables)
            {
                parentNode.AddChildNode(sourceColumn, table);
                if (!table.Table.Equals(targetTable))
                {
                    FindLinkingTables(sourceList, table, targetTable, maxSearchDepth);
                }
                else
                {
                    table.NotifySeachResult(true);
                }
            }
        }
    }
}

EDIT: 分离TableSearchNode逻辑并添加属性和方法,以保证完整性。

//TableSearchNode
public Dictionary<string, List<TableSearchNode>> Children { get; private set; }

//TableSearchNode
public List<string> Ancenstory
{
    get
    {
        Stack<string> ancestory = new Stack<string>();
        TableSearchNode ancestor = ParentNode;
        while (ancestor != null)
        {
            ancestory.Push(ancestor.tbl);
            ancestor = ancestor.ParentNode;
        }
        return ancestory.ToList();
    }
}

//TableSearchNode
public void AddChildNode(string referenceColumn, TableSearchNode childNode)
    {
        childNode.ParentNode = this;
        List<TableSearchNode> relatedTables = null;
        Children.TryGetValue(referenceColumn, out relatedTables);
        if (relatedTables == null)
        {
            relatedTables = new List<TableSearchNode>();
            Children.Add(referenceColumn, relatedTables);
        }
        relatedTables.Add(childNode);
    }

Thanks in advance for your help!


2
我正在更详细地查看这个问题,但是有一个评论是,如果性能是一个重要问题,您可能需要考虑删除所有的LINQ调用。 - nicholas
1
@nicholas 顺便问一下,你在哪里读到LINQ一定比其他方法慢的? - Daniel Kelley
1
@EricScherrer 非常感谢您提供的链接。不幸的是,由于每个函数在循环中可以调用自身多次,我发现我无法模仿这些示例;然而,我将 if (!table.Table.Equals(targetTable)) 更改为 if (table.Table.Equals(targetTable)) 并交换了内部语句,以便递归调用是最后一个语句。我不确定它在执行时如何翻译。如果您有更好的方法,请告诉我。顺便说一下,我已经以所有可能的方式出了大错(进程吃掉了1.5GB的RAM),所以不用担心 :) - Sinker
这不就是一种最短路径搜索吗?计算出在一步内可到达的表的集合。从中计算出在两步内可到达的表的集合,依此类推。 - usr
2
你在那些微小的方面优化代码是在浪费时间。你会得到一个恒定的加速,但即使是10倍也无济于事。你需要一个完全新的算法,具有更低的渐进成本。 - usr
显示剩余13条评论
4个回答

4
你浪费了很多内存。立即想到的是:
  1. First of all replace incoming List<TableColumns> sourceList with ILookup<string, TableColumns>. You should do this once before calling FindLinkingTables:

    ILookup<string, TableColumns> sourceLookup = sourceList.ToLookup(s => s.Table);
    FindLinkingTables(sourceLookup, parentNode, targetTable, maxSearchDepth);
    
  2. Do not call .ToList() if do not really need it. For example, if you are going only to enumerate all children of resulting list once, you do not need it. So your main function will looks like this:

    private void FindLinkingTables(ILookup<string, TableColumns> sourceLookup, TableSearchNode parentNode, string targetTable, int maxSearchDepth)
    {
        if (parentNode.Level < maxSearchDepth)
        {
            var tableColumns = sourceLookup[parentNode.Table].Select(x => x.Column);
    
            foreach (string sourceColumn in tableColumns)
            {
                string shortName = sourceColumn.Substring(1);
    
                var tables = sourceLookup
                    .Where(
                        group => !group.Key.Equals(parentNode.Table)
                                 && !parentNode.Ancenstory.Contains(group.Key))
                    .SelectMany(group => group)
                    .Where(tableColumn => tableColumn.Column.Substring(1).Equals(shortName))
                    .Select(
                        x => new TableSearchNode
                        {
                            Table = x.Table,
                            Column = x.Column,
                            Level = parentNode.Level + 1
                        });
    
                foreach (TableSearchNode table in tables)
                {
                    parentNode.AddChildNode(sourceColumn, table);
                    if (!table.Table.Equals(targetTable))
                    {
                        FindLinkingTables(sourceLookup, table, targetTable, maxSearchDepth);
                    }
                    else
                    {
                        table.NotifySeachResult(true);
                    }
                }
            }
        }
    }
    

    [Edit]

  3. Also in order to speedup remaining complex LINQ query, you can prepare yet another ILookup:

    ILookup<string, TableColumns> sourceColumnLookup = sourceLlist
            .ToLookup(t => t.Column.Substring(1));
    
    //...
    
    private void FindLinkingTables(
        ILookup<string, TableColumns> sourceLookup, 
        ILookup<string, TableColumns> sourceColumnLookup,
        TableSearchNode parentNode, string targetTable, int maxSearchDepth)
    {
        if (parentNode.Level >= maxSearchDepth) return;
    
        var tableColumns = sourceLookup[parentNode.Table].Select(x => x.Column);
    
        foreach (string sourceColumn in tableColumns)
        {
            string shortName = sourceColumn.Substring(1);
    
            var tables = sourceColumnLookup[shortName]
                .Where(tableColumn => !tableColumn.Table.Equals(parentNode.Table)
                                      && !parentNode.AncenstoryReversed.Contains(tableColumn.Table))
                .Select(
                    x => new TableSearchNode
                        {
                            Table = x.Table,
                            Column = x.Column,
                            Level = parentNode.Level + 1
                        });
    
    
            foreach (TableSearchNode table in tables)
            {
                parentNode.AddChildNode(sourceColumn, table);
                if (!table.Table.Equals(targetTable))
                {
                    FindLinkingTables(sourceLookup, sourceColumnLookup, table, targetTable, maxSearchDepth);
                }
                else
                {
                    table.NotifySeachResult(true);
                }
            }
        }
    }
    
  4. I've checked your Ancestory property. If IEnumerable<string> is enough for your needs check this implementation:

    public IEnumerable<string> AncenstoryEnum
    {
        get { return AncenstoryReversed.Reverse(); }
    }
    
    public IEnumerable<string> AncenstoryReversed
    {
        get
        {
            TableSearchNode ancestor = ParentNode;
            while (ancestor != null)
            {
                yield return ancestor.tbl;
                ancestor = ancestor.ParentNode;
            }
        }
    }
    

我修改了代码,使用IEnumerable代替List,不再需要ToList()。我将尝试使用ILookup。谢谢。 - Sinker
有两个查找表是个好主意。我会尝试一下。 - Sinker
还要检查Ancenstory属性的其他可能实现。 AncenstoryReversed枚举所有父项直到顶部,无需临时缓冲区。 但是,当然,AncenstoryEnum使用了额外的存储(它是在LINQ Reverse函数内创建的Array)。 - Woodman
以前从未见过yield。谢谢! - Sinker
哦,我错过了你在每个递归级别上调用Ancenstory属性的事实... 好的,我已经更正了列在第3点下的代码,所以它重用了AncenstoryReversed属性。至少不会有额外的内存成本... 但总体逻辑看起来可疑,也许你应该考虑其他方式来组合结果(记住这里提到的所有性能建议)。 - Woodman

2
我已经成功地将你的 FindLinkingTables 代码重构为以下内容:
private void FindLinkingTables(
    List<TableColumns> sourceList, TableSearchNode parentNode,
    string targetTable, int maxSearchDepth)
{
    if (parentNode.Level < maxSearchDepth)
    {
        var sames = sourceList.Where(w => w.Table == parentNode.Table);

        var query =
            from x in sames
            join y in sames
                on x.Column.Substring(1) equals y.Column.Substring(1)
            where !parentNode.Ancenstory.Contains(y.Table)
            select new TableSearchNode
            {
                Table = x.Table,
                Column = x.Column,
                Level = parentNode.Level + 1
            };

        foreach (TableSearchNode z in query)
        {
            parentNode.AddChildNode(z.Column, z);
            if (z.Table != targetTable)
            {
                FindLinkingTables(sourceList, z, targetTable, maxSearchDepth);
            }
            else
            {
                z.NotifySeachResult(true);
            }
        }
    }
}

我认为你在查询语句的where !parentNode.Ancenstory.Contains(y.Table)部分的逻辑有缺陷。我认为你需要重新思考你的搜索操作,并看看你能得出什么。


我添加了更多的代码来展示“祖先”是如何构建到节点中的。无论如何,我会进行审查以查看是否存在任何问题。调试是我的好朋友! - Sinker

2

在我看来,有几个方面值得注意:

  1. In your Where clause you make a call to parentNode.Ancenstory; this has logarithmic run time by itself, then you make a call to .Contains on the List<string> it returns, which is another logarithmic call (it's linear, but the list has a logarithmic number of elements). What you're doing here is checking for cycles in your graph. These costs can be made constant by adding a field to TableColumns.Table which stores information on how that Table has been processed by the algorithm (alternatively, you could use a Dictionary<Table, int>, to avoid adding a field to the object). Typically, in a DFS algorithm, this field is either White, Grey, or Black - White for unprocessed (you haven't seen that Table before), Grey for an ancestor of the Table currently being processed, and Black for when you're done processing that Table and all of its children. To update your code to do this, it'd look like:

    foreach (string sourceColumn in tableColumns)
    {
        string shortName = sourceColumn.Substring(1);
    
        IEnumerable<TableSearchNode> tables =
            sourceList.Where(x => x.Column[0].Equals(shortName) &&
                                  x.Color == White)
                      .Select(x => new TableSearchNode
                                       {
                                            Table = x.Table,
                                            Column = x.Column,
                                            Level = parentNode.Level + 1
                                        });
        foreach (TableSearchNode table in tables)
        {
            parentNode.AddChildNode(sourceColumn, table);
    
            table.Color = Grey;
    
            if (!table.Table.Equals(targetTable))
            {
                FindLinkingTables(sourceList, table, targetTable, maxSearchDepth);
            }
            else
            {
                table.NotifySeachResult(true);
            }
    
            table.Color = Black;
        }
    }
    
  2. As you mentioned above, you're running out of memory. The easiest fix for this is to remove the recursive call (which is acting as an implicit stack) and replace it with an explicit Stack data structure, removing the recursion. Additionally, this changes the recursion to a loop, which C# is better at optimizing.

    private void FindLinkingTables(List<TableColumns> sourceList, TableSearchNode root, string targetTable, int maxSearchDepth)
    {
        Stack<TableSearchNode> stack = new Stack<TableSearchNode>();
        TableSearchNode current;
    
        stack.Push(root);
    
        while (stack.Count > 0 && stack.Count < maxSearchDepth)
        {
            current = stack.Pop();
    
            var tableColumns = sourceList.Where(x => x.Table.Equals(current.Table))
                                         .Select(x => x.Column);
    
            foreach (string sourceColumn in tableColumns)
            {
                string shortName = sourceColumn.Substring(1);
    
                IEnumerable<TableSearchNode> tables =
                    sourceList.Where(x => x.Column[0].Equals(shortName) &&
                                          x.Color == White)
                              .Select(x => new TableSearchNode
                                               {
                                                    Table = x.Table,
                                                    Column = x.Column,
                                                    Level = current.Level + 1
                                                });
                foreach (TableSearchNode table in tables)
                {
                    current.AddChildNode(sourceColumn, table);
    
                    if (!table.Table.Equals(targetTable))
                    {
                        table.Color = Grey;
                        stack.Push(table);
                    }
                    else
                    {
                        // you could go ahead and construct the ancestry list here using the stack
                        table.NotifySeachResult(true);
                        return;
                    }
                }
            }
    
            current.Color = Black;
    
        }
    }
    
  3. Finally, we don't know how costly Table.Equals is, but if the comparison is deep then that could be adding significant run time to your inner loop.


这里有很多需要尝试和理解的内容。我查了一下DFS,因为我以前没学过它。我想进一步探索这个选项。我认为我应该开始重复使用节点来节省内存。让我看看能否通过你的例子。谢谢。 - Sinker
1
@Sinker 维基百科始终是算法学习的好去处:http://en.wikipedia.org/wiki/Depth-first_search - Zack Butcher

2

好的,这里有一个答案,基本上放弃了您发布的所有代码。

首先,您应该将您的List<TableColumns>哈希化为可以索引的东西,而无需遍历整个列表。

为此,我编写了一个名为 TableColumnIndexer 的类:

class TableColumnIndexer
{
    Dictionary<string, HashSet<string>> tables = new Dictionary<string, HashSet<string>>();

    public void Add(string tableName, string columnName)
    {
        this.Add(new TableColumns { Table = tableName, Column = columnName });
    }

    public void Add(TableColumns tableColumns)
    {
        if(! tables.ContainsKey(tableColumns.Table))
        {
            tables.Add(tableColumns.Table, new HashSet<string>());
        }

        tables[tableColumns.Table].Add(tableColumns.Column);
    }

    // .... More code to follow

现在,一旦您将所有表/列值注入到此索引类中,就可以调用递归方法来检索两个表之间的最短祖先链接。这里的实现有些松散,但目前是为了清晰而非性能而编写的:

    // .... continuation of TableColumnIndexer class
    public List<string> GetShortestAncestry(string parentName, string targetName, int maxDepth)
    {
        return GetSortestAncestryR(parentName, targetName, maxDepth - 1, 0, new Dictionary<string,int>());
    }

    private List<string> GetSortestAncestryR(string currentName, string targetName, int maxDepth, int currentDepth, Dictionary<string, int> vistedTables)
    {
        // Check if we have visited this table before
        if (!vistedTables.ContainsKey(currentName))
            vistedTables.Add(currentName, currentDepth);

        // Make sure we have not visited this table at a shallower depth before
        if (vistedTables[currentName] < currentDepth)
            return null;
        else
            vistedTables[currentName] = currentDepth;


        if (currentDepth <= maxDepth)
        {
            List<string> result = new List<string>();

            // First check if the current table contains a reference to the target table
            if (tables[currentName].Contains(targetName))
            {
                result.Add(currentName);
                result.Add(targetName);
                return result;
            }
            // If not try to see if any of the children tables have the target table
            else
            {
                List<string> bestResult = null;
                    int bestDepth = int.MaxValue;

                foreach (string childTable in tables[currentName])
                {
                    var tempResult = GetSortestAncestryR(childTable, targetName, maxDepth, currentDepth + 1, vistedTables);

                    // Keep only the shortest path found to the target table
                    if (tempResult != null && tempResult.Count < bestDepth)
                    {
                        bestDepth = tempResult.Count;
                        bestResult = tempResult;
                    }
                }

                // Take the best link we found and add it to the result list
                if (bestDepth < int.MaxValue && bestResult != null)
                {
                    result.Add(currentName);
                    result.AddRange(bestResult);
                    return result;
                }
                // If we did not find any result, return nothing
                else
                {
                    return null;
                }
            }
        }
        else
        {
            return null;
        }
    }
}

现在所有的代码都只是一个(有点啰嗦的)最短路径算法的实现,它允许循环路径和源表与目标表之间的多条路径。请注意,如果两个表之间有相同深度的两条路线,算法将只选择其中一条(不一定可预测)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接