在Python中查找邻居的邻居的最有效方法

5

假设有两个数组IJ,它们确定了相邻的配对:

I = np.array([0, 0, 1, 2, 2, 3])
J = np.array([1, 2, 0, 0, 3, 2])

这意味着元素0有两个邻居12。元素1只有0作为邻居,依此类推。
创建包含所有邻居三元组I'J'K'的数组,使得在条件ijk为不同元素(i != j != k)的情况下,ji的邻居,kj的邻居,最有效的方法是什么?
Ip = np.array([0, 0, 2, 3])
Jp = np.array([2, 2, 0, 2])
Kp = np.array([0, 3, 1, 0])

当然,一种方法是遍历每个元素。有没有更有效率的算法?(处理1000万到5亿个元素)


3
为什么要使用数组?这是一个图问题:使用networkx或其他图形包。让包来处理你的效率问题。 - Prune
1
请明确指定您的用例。您需要生成所有可能的三元组吗?这是按需服务吗?值得备忘录记录吗? - Prune
1
你说你想创建三元组,其中中间节点连接到每个端节点。然而,你没有说明如何决定要生成哪些三元组。了解程序规范对于选择设计和实现非常重要。 - Prune
@Roy,如果获取每个顶点的三元组数量而不是所有三元组足够的话,有一个优雅而快速的解决方案。计算图形的邻接矩阵(如下面我的解决方案所示),并对其进行平方。第i-j个条目将包含从顶点i到顶点j的长度为2的路径数。 - Shir
1
一百万行的邻接矩阵... - anon01
显示剩余3条评论
4个回答

5

我会采用非常简单的方法,使用pandas(IJ 是您的numpy数组):

import pandas as pd

df1 = pd.DataFrame({'I': I, 'J': J})
df2 = df1.rename(columns={'I': 'K', 'J': 'I'})

result = pd.merge(df2, df1, on='I').query('K != J')

优点在于 pandas.merge 依赖于非常快速的底层数值实现。此外,您可以通过使用索引进行合并来进一步加快计算速度。
为了减少这种方法所需的内存,可能有必要在合并它们之前减小 df1df2 的大小(例如,通过将列的 dtype 更改为适合您的需要的内容)。
以下是如何优化计算速度和内存的示例:
from timeit import timeit
import numpy as np
import pandas as pd

I = np.random.randint(0, 10000, 1000000)
J = np.random.randint(0, 10000, 1000000)

df1_64 = pd.DataFrame({'I': I, 'J': J})
df1_32 = df1_64.astype('int32')
df2_64 = df1_64.rename(columns={'I': 'K', 'J': 'I'})
df2_32 = df1_32.rename(columns={'I': 'K', 'J': 'I'})

timeit(lambda: pd.merge(df2_64, df1_64, on='I').query('K != J'), number=1)
# 18.84
timeit(lambda: pd.merge(df2_32, df1_32, on='I').query('K != J'), number=1)
# 9.28

1
这正是我正在寻找的。非常聪明的解决方案!感谢您分享它。 - Roy

1

这是使用networkx提供的初始解决方案,它是一个优化的图计算库:

import numpy as np
import networkx as nx

I = np.array([0, 0, 1, 2, 2, 3])
J = np.array([1, 2, 0, 0, 3, 2])

I_, J_, K_ = [], [], [],
num_nodes = np.max(np.concatenate([I,J])) + 1
A = np.zeros((num_nodes, num_nodes))
A[I,J] = 1
print("Adjacency Matrix:")
print(A)
G = nx.from_numpy_matrix(A)

for i in range(num_nodes):
    first_neighbors = list(G.neighbors(i))

    for j in first_neighbors:
        second_neighbor = list(G.neighbors(j))
        second_neighbor_no_circle = list(filter(lambda node: node != i, second_neighbor))
        num_second_neighbors = len(second_neighbor_no_circle)

        if num_second_neighbors > 0:
            I_.extend(num_second_neighbors * [i])
            J_.extend(num_second_neighbors * [j])
            K_.extend(second_neighbor_no_circle)
            
I_, J_, K_ = np.array(I_), np.array(J_), np.array(K_)
print("result:")
print(I_)
print(J_)
print(K_)

####### Output ####### 
Adjacency Matrix:
[[0. 1. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 1.]
 [0. 0. 1. 0.]]
result:
[0 1 2 3]
[2 0 0 2]
[3 2 1 0]

我对上面的代码使用了%%timeit而没有打印语句来检查运行时间:49 µs ± 113 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

复杂度分析: 找到所有邻居的邻居本质上是在深度优先搜索算法中进行2步。这可能需要,取决于图的拓扑结构,最多O(|V| + |E|),其中|E|是图中的边数,|V|是顶点数。

据我所知,在一般图上没有更好的算法。 但是,如果您知道有关图的某些特殊属性,则可以更紧密地限制运行时间或者根据此知识改变当前算法。

例如,如果您知道所有顶点最多具有d条边,并且图具有一个连通组件,则此实现的界限变为O(2d),如果d << |E|,则相当不错。

如果您有任何问题,请告诉我。


1
谢谢,但是对于大量节点来说,这似乎不起作用。矩阵大小将随 N^2 增加。 - Roy
很遗憾...看看这个帖子,他们讨论了一些处理大型图形的解决方案。 - Shir

1
你需要寻找的是图中长度为3的所有路径(all paths of length 3)。你可以使用以下简单的递归算法来实现:
import networkx as nx

def findPaths(G,u,n):
    """Returns a list of all paths of length `n` starting at vertex `u`."""
    if n==1:
        return [[u]]
    paths = [[u]+path for neighbor in G.neighbors(u) for path in findPaths(G,neighbor,n-1) if u not in path]
    return paths

# Generating graph
vertices = np.unique(I)
edges = list(zip(I,J))
G = nx.Graph()
G.add_edges_from(edges)

# Grabbing all 3-paths
paths = [path for v in vertices for path in findPaths(G,v,3)]

paths
>>> [[0, 2, 3], [1, 0, 2], [2, 0, 1], [3, 2, 0]]

1

生成所有三元组的算法并没有什么特别神奇的方法。您可以通过有序搜索来避免重新获取节点的邻居,但仅限于此。

  • 创建一个空节点列表N。
  • 将某些起始节点S添加到N中。
  • 当N不为空时
    • 从列表中弹出一个节点; 将其称为A。
    • 创建其邻居集合A'。
    • 对于A的每个邻居B
      • 对于A'中的每个元素a
        • 生成三元组(a, A, B)
      • 如果尚未检查,则将B添加到要检查的节点列表中。

这有所帮助吗?仍然需要处理算法中的一些细节,例如避免重复生成和移动团的要点。


谢谢,不过我想避免使用列表和循环。也许可以基于itertools或像bincount这样的技巧来实现。 - Roy
3
Itertools仅仅是将循环隐藏起来,并通过生成器替换显式循环。具体的解决方案将取决于您使用的数据结构。如果您尝试编写代码,但遇到困难,则可以在 Stack Overflow 上发布一个好问题。对于 Stack Overflow 的宗旨而言,这个问题已经偏离了轨道。 - Prune
话虽如此,你的三元组生成应该很简单,使用嵌套列表推导,并检查你不会生成 (A, B, A) 三元组。 - Prune
我编辑了问题以澄清ABA。整个重点在于效率,将循环从Python移动到数值库或使用数组而不是列表通常是加速的最佳方法。 - Roy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接