合并两个数组并排序

20

给定两个类似以下形式的已排序数组:

a = array([1,2,4,5,6,8,9])

b = array([3,4,7,10])

我希望输出结果为:

c = array([1,2,3,4,5,6,7,8,9,10])

或:

c = array([1,2,3,4,4,5,6,7,8,9,10])

我知道我可以执行以下操作:

c = unique(concatenate((a,b))

我在想是否有更快的方法来处理这个问题,因为我所处理的数组有数百万个元素。

欢迎任何想法。谢谢


我真的很怀疑如果不编写编译扩展来组合“concatenate”、“unique”和“sort”,你是否能做得更好。 - mgilson
你可以至少省略掉 sort,因为从 unique 输出的结果已经保证是排序过的。 - Fred Foo
8个回答

36

由于您使用了numpy,我怀疑bisec并没有帮助到您...所以我建议使用两个小建议:

  1. 不要使用np.sort,而是使用c.sort()方法,该方法原地对数组进行排序并避免复制。
  2. np.unique必须使用不在原地的np.sort。因此,不要使用np.unique,可以手动处理逻辑。即先进行排序(原地排序),然后手动执行np.unique方法(还需检查其Python代码),其中使用flag = np.concatenate(([True], ar[1:] != ar[:-1]))这个操作可以得到unique = ar[flag](其中ar已经被排序)。为了更好,您可能应该直接在原地创建flag数组,即flag = np.ones(len(ar), dtype=bool),然后np.not_equal(ar[1:], ar[:-1], out=flag[1:]),这样可以避免一个完整的flag副本。
  3. 我不确定这一点。但.sort有3种不同的算法,因为您的数组可能已经几乎排好序了,更改排序方法可能会产生速度差异。

这将使全部内容接近您获得的结果(在预处理之前不进行去重):

def insort(a, b, kind='mergesort'):
    # took mergesort as it seemed a tiny bit faster for my sorted large array try.
    c = np.concatenate((a, b)) # we still need to do this unfortunatly.
    c.sort(kind=kind)
    flag = np.ones(len(c), dtype=bool)
    np.not_equal(c[1:], c[:-1], out=flag[1:])
    return c[flag]

如果你把这些建议整合成一个函数,我很乐意将它添加到我在答案中发布的timeit框架中。 - mgilson
@mgilson已经添加了它,但如果你想进行比较,请与大数组进行比较...小数组可能对此目的不太准确,因为在这个应用程序中,开销太小并且无关紧要。 - seberg
测试成功——目前您在数组大小为10000的领先位置。 - mgilson
是的,你的速度绝对是最快的。 - Jun
1
我必须说——这个答案非常出色。(正如我在原问题的评论中所记录的)我原本认为你无法对OP发布的代码进行太多改进。干得好。(希望其他人看到这一点并慷慨地点赞)。 - mgilson
说实话,我自己也没抱太大希望,但是我猜对于给定的数据来说,归并排序显然会有很大的区别(除非对于 OP 来说,交换成为了一个问题)。实际上,OP 的代码等价于 np.union1d... - seberg

12

在数组中间插入元素是一种非常低效的操作,因为它们在内存中是平面结构,所以每当你插入另一个元素时,你都需要将其余的所有元素向右移。因此,你可能不想使用bisect。这样做的复杂度大约为O(N ^ 2)

你当前的方法是O(n * log(n)),所以这已经改善了很多,但还不够完美。

将所有元素插入哈希表(比如set)也是一种方法。这将花费O(N)的时间来进行去重,但之后你还需要排序,这将花费O(n * log(n))的时间。仍然不是很好。

真正的O(N)解决方案涉及分配一个数组,然后通过取输入列表的最小头部逐个填充它的元素,即合并。不幸的是,似乎没有numpy或Python有这样的东西。解决方案可能是在Cython中编写一个。

它大概会像下面这样:

def foo(numpy.ndarray[int, ndim=1] out,
        numpy.ndarray[int, ndim=1] in1, 
        numpy.ndarray[int, ndim=1] in2):

        cdef int i = 0
        cdef int j = 0
        cdef int k = 0
        while (i!=len(in1)) or (j!=len(in2)):
            # set out[k] to smaller of in[i] or in[j]
            # increment k
            # increment one of i or j

同意,编写一个像这样的小型Cython扩展是一个不错的选择。 - seberg
1
巨大的效率提升,只要您对预排序输入的假设是正确的。看起来OP就是这么打算的。 - hobs
1
你可以随时检查你的输入是否已排序,这只需要 O(n) 的时间,因此不会增加复杂度。 - jleahy

9

当你对时间问题感到好奇时,最好的方法就是使用timeit。下面列出了各种方法及其时间的子集:

import numpy as np
import timeit
import heapq



def insort(a, x, lo=0, hi=None):
    if hi is None: hi = len(a)
    while lo < hi:
        mid = (lo+hi)//2
        if x < a[mid]: hi = mid
        else: lo = mid+1
    return lo, np.insert(a, lo, [x])

size=10000
a = np.array(range(size))
b = np.array(range(size))

def op(a,b):
    return np.unique(np.concatenate((a,b)))

def martijn(a,b):
    c = np.copy(a)
    lo = 0
    for i in b:
        lo, c = insort(c, i, lo)
    return c

def martijn2(a,b):
    c = np.zeros(len(a) + len(b), a.dtype)
    for i, v in enumerate(heapq.merge(a, b)):
        c[i] = v

def larsmans(a,b):
    return np.array(sorted(set(a) | set(b)))

def larsmans_mod(a,b):
    return np.array(set.union(set(a),b))


def sebastian(a, b, kind='mergesort'):
    # took mergesort as it seemed a tiny bit faster for my sorted large array try.
    c = np.concatenate((a, b)) # we still need to do this unfortunatly.
    c.sort(kind=kind)
    flag = np.ones(len(c), dtype=bool)
    np.not_equal(c[1:], c[:-1], out=flag[1:])
    return c[flag]

结果:

martijn2     25.1079499722
OP       1.44831800461
larsmans     9.91507601738
larsmans_mod     5.87612199783
sebastian    3.50475311279e-05

我在这里的具体贡献是 larsmans_mod,它避免了创建2个集合 - 它只创建一个集合,在这样做的过程中几乎将执行时间缩短了一半。

编辑 删除了martijn,因为它太慢而无法竞争。 还测试了稍微大一些(排序)输入数组。 我也没有测试输出的正确性...


当然,如果我们从更大的数组开始,我会很感兴趣看看它如何扩展... - mgilson
1
有趣的是,显然不断重新创建c数组正在折磨我。 - Martijn Pieters
@MartijnPieters -- 你最近的尝试更好了,但仍然不如 OP。 - mgilson
啊,这是因为我从来没有认真使用过numpy。 - Martijn Pieters

4
除了使用 bisect.insort 的其他答案外,如果您对性能不满意,可以尝试使用 blist 模块与 bisect。它应该会提高性能。
传统的 list 插入复杂度为 O(n),而 blist 的插入复杂度为 O(log(n))
此外,您的数组似乎已经排序。如果是这样,您可以使用来自 heapq 模块的 merge 函数利用两个数组都已排序的事实。这种方法将需要额外的开销,因为会在内存中创建一个新的数组。这可能是要考虑的选择,因为该解决方案的时间复杂度为 O(n+m),而使用 insort 的解决方案的复杂度为 O(n*m)(n个元素 * m次插入)。
import heapq

a = [1,2,4,5,6,8,9]
b = [3,4,7,10]


it = heapq.merge(a,b) #iterator consisting of merged elements of a and b
L = list(it) #list made of it
print(L)

输出:

[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

如果您想删除重复值,可以使用 groupby

import heapq
import itertools

a = [1,2,4,5,6,8,9]
b = [3,4,7,10]


it = heapq.merge(a,b) #iterator consisting of merged elements of a and b
it = (k for k,v in itertools.groupby(it))
L = list(it) #list made of it
print(L)

输出:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

看起来很不错,但我担心它的内存效率是否能像numpy数组一样高。 - Jun
@Jun 你应该进行一些测试来确定内存消耗。传统数组和列表的问题在于,插入元素时需要将插入位置后面的所有元素向后移动1个位置,这需要O(n)的时间复杂度。而在blist中,他们使用另一种数据结构,因此复杂度为O(log(n))。对于大型数组和列表,这将极大地改变事情的发展。 - ovgolovin

2
你可以使用bisect模块来进行此类合并,将第二个Python列表合并到第一个列表中。 bisect*函数适用于numpy数组,但insort*函数不适用。很容易使用模块源代码来调整算法,它非常基础:
from numpy import array, copy, insert

def insort(a, x, lo=0, hi=None):
    if hi is None: hi = len(a)
    while lo < hi:
        mid = (lo+hi)//2
        if x < a[mid]: hi = mid
        else: lo = mid+1
    return lo, insert(a, lo, [x])

a = array([1,2,4,5,6,8,9])
b = array([3,4,7,10])

c = copy(a)
lo = 0
for i in b:
    lo, c = insort(c, i, lo)

虽然自定义的insort在这里并没有添加任何内容,但默认的bisect.bisect也能很好地工作:

import bisect

c = copy(a)
lo = 0
for i in b:
    lo = bisect.bisect(c, i)
    c = insert(c, i, lo)

使用这个修改后的insort比组合和排序更加高效。因为b也是排序的,所以我们可以跟踪lo插入点并从那里开始搜索下一个点,而不是在每次循环中考虑整个数组。
如果您不需要保留a,只需直接对该数组进行操作并保存副本。
更加高效的方法:由于两个列表都已排序,因此我们可以使用heapq.merge
from numpy import zeros
import heapq

c = zeros(len(a) + len(b), a.dtype)
for i, v in enumerate(heapq.merge(a, b)):
    c[i] = v

我刚想出来的代码。但是我意识到,如果数组'b'有数百万个元素,那么机器将会在你的代码中重新分配数百万次数组'c'。这对我来说看起来不太高效。 - Jun
@Jun:是的,我不太喜欢使用insert,所以我找到了一种更好的方法,仍然使用预分配的数组和heapq.merge。 - Martijn Pieters

1
使用bisect模块来实现此功能:
import bisect

a = array([1,2,4,5,6,8,9])
b = array([3,4,7,10])

for i in b:
    pos = bisect.bisect(a, i)
    insert(a,[pos],i) 

我现在无法测试,但应该可以工作。


NumPy数组没有insert方法。 - mgilson
1
如果它确实有一个“insert”方法,你应该使用“bisect.insort”代替。 - Martijn Pieters
由于第二个数组也是排序的,每次跟踪插入位置可能更好? - Jun
@Jun,没问题,你可以这样做。 - Charlie G

1

sortednp包实现了对排序好的numpy数组的高效合并,仅对值进行排序,而不使它们独一无二:

import numpy as np
import sortednp
a = np.array([1,2,4,5,6,8,9])
b = np.array([3,4,7,10])
c = sortednp.merge(a, b)

我测量了时间并将其与类似帖子的此答案进行比较,在那里它优于numpy的mergesort(v1.17.4)。


0

似乎没有人提到union1dunion1d)。目前,它是unique(concatenate((ar1, ar2)))的快捷方式,但它是一个易于记忆的短名称,并且由于它是一个库函数,有潜力被numpy开发人员进行优化。对于大型数组,它的性能非常类似于seberg所接受的答案中的insort。这是我的基准测试:

import numpy as np

def insort(a, b, kind='mergesort'):
    # took mergesort as it seemed a tiny bit faster for my sorted large array try.
    c = np.concatenate((a, b))  # we still need to do this unfortunatly.
    c.sort(kind=kind)
    flag = np.ones(len(c), dtype=bool)
    np.not_equal(c[1:], c[:-1], out=flag[1:])
    return c[flag]

size = int(1e7)
a = np.random.randint(np.iinfo(np.int).min, np.iinfo(np.int).max, size)
b = np.random.randint(np.iinfo(np.int).min, np.iinfo(np.int).max, size)

np.testing.assert_array_equal(insort(a, b), np.union1d(a, b))

import timeit
repetitions = 20
print("insort: %.5fs" % (timeit.timeit("insort(a, b)", "from __main__ import a, b, insort", number=repetitions)/repetitions,))
print("union1d: %.5fs" % (timeit.timeit("np.union1d(a, b)", "from __main__ import a, b; import numpy as np", number=repetitions)/repetitions,))

我的机器上的输出:

insort: 1.69962s
union1d: 1.66338s

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接