将x和y数组点的笛卡尔积转换为二维点的单一数组

209

我有两个numpy数组,分别定义了网格的x和y轴。例如:

x = numpy.array([1,2,3])
y = numpy.array([4,5])

我想要生成这些数组的笛卡尔积,以生成:

array([[1,4],[2,4],[3,4],[1,5],[2,5],[3,5]])

我需要在一个循环中多次执行此操作,因此希望找到一种不太低效的方法。我认为将它们转换为 Python 列表并使用 itertools.product 然后再转换回 NumPy 数组不是最有效的形式。


我注意到itertools方法中最昂贵的步骤是从列表转换为数组的最后一步。如果没有这个最后一步,它比Ken的示例快两倍。 - Alexey Lebedev
18个回答

201

一个典型的cartesian_product(几乎)

对于这个问题,有许多不同属性的方法。有些比其他的更快,有些更通用。经过大量测试和调整,我发现以下函数计算 n 维cartesian_product时,对于许多输入而言比大多数其他方法都更快。对于一对略微更复杂但在许多情况下甚至更快的方法,请参见Paul Panzer的答案。

鉴于该答案,这已不再是我所知道的numpy中最快的cartesian product实现。然而,我认为它的简单性将继续使其成为未来改进的有用基准:

def cartesian_product(*arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[...,i] = a
    return arr.reshape(-1, la)

值得一提的是,这个函数以一种不寻常的方式使用了ix_;尽管ix_的文档使用是为了在数组中生成索引,但恰好具有相同形状的数组可以用于广播赋值。非常感谢mgilson,他启发了我尝试以这种方式使用ix_,以及unutbu,他对这个答案提供了一些极其有用的反馈,包括建议使用numpy.result_type

值得注意的替代方案

有时以Fortran顺序编写连续的内存块会更快。这就是这个替代方案“cartesian_product_transpose”的基础,它在某些硬件上已被证明比“cartesian_product”更快(见下文)。然而,使用相同原理的Paul Panzer的答案甚至更快。尽管如此,我仍将其包含在此处供感兴趣的读者参考:
def cartesian_product_transpose(*arrays):
    broadcastable = numpy.ix_(*arrays)
    broadcasted = numpy.broadcast_arrays(*broadcastable)
    rows, cols = numpy.prod(broadcasted[0].shape), len(broadcasted)
    dtype = numpy.result_type(*arrays)

    out = numpy.empty(rows * cols, dtype=dtype)
    start, end = 0, rows
    for a in broadcasted:
        out[start:end] = a.reshape(-1)
        start, end = end, end + rows
    return out.reshape(cols, rows).T

了解 Panzer 的方法后,我写了一个新版本,速度几乎与他的一样快,而且几乎和 cartesian_product 一样简单:

def cartesian_product_simple_transpose(arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty([la] + [len(a) for a in arrays], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[i, ...] = a
    return arr.reshape(la, -1).T

这个函数似乎有一些恒定的时间开销,使它在小输入时比Panzer的实现运行得更慢。但对于更大的输入,在我运行的所有测试中,它的表现和他最快的实现(cartesian_product_transpose_pp)一样好。

在接下来的部分中,我包括了一些其他替代方案的测试。这些测试现在已经有些过时了,但是为了避免重复劳动,我决定将它们保留在这里以供历史研究之用。要获取最新的测试,请参见Panzer的答案,以及Nico Schlömer的答案。

与其他替代方案的测试

这里是一组测试,展示了一些函数相对于许多替代方案提供的性能提升。所有这些测试都是在一个四核机器上进行的,运行Mac OS 10.12.5、Python 3.6.1和numpy 1.12.1。硬件和软件的变化可能会产生不同的结果,所以你自己运行这些测试以确保结果!

定义:

import numpy
import itertools
from functools import reduce

### Two-dimensional products ###

def repeat_product(x, y):
    return numpy.transpose([numpy.tile(x, len(y)), 
                            numpy.repeat(y, len(x))])

def dstack_product(x, y):
    return numpy.dstack(numpy.meshgrid(x, y)).reshape(-1, 2)

### Generalized N-dimensional products ###

def cartesian_product(*arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[...,i] = a
    return arr.reshape(-1, la)

def cartesian_product_transpose(*arrays):
    broadcastable = numpy.ix_(*arrays)
    broadcasted = numpy.broadcast_arrays(*broadcastable)
    rows, cols = numpy.prod(broadcasted[0].shape), len(broadcasted)
    dtype = numpy.result_type(*arrays)

    out = numpy.empty(rows * cols, dtype=dtype)
    start, end = 0, rows
    for a in broadcasted:
        out[start:end] = a.reshape(-1)
        start, end = end, end + rows
    return out.reshape(cols, rows).T

# from https://dev59.com/V3M_5IYBdhLWcg3w1G6N#1235363

def cartesian_product_recursive(*arrays, out=None):
    arrays = [numpy.asarray(x) for x in arrays]
    dtype = arrays[0].dtype

    n = numpy.prod([x.size for x in arrays])
    if out is None:
        out = numpy.zeros([n, len(arrays)], dtype=dtype)

    m = n // arrays[0].size
    out[:,0] = numpy.repeat(arrays[0], m)
    if arrays[1:]:
        cartesian_product_recursive(arrays[1:], out=out[0:m,1:])
        for j in range(1, arrays[0].size):
            out[j*m:(j+1)*m,1:] = out[0:m,1:]
    return out

def cartesian_product_itertools(*arrays):
    return numpy.array(list(itertools.product(*arrays)))

### Test code ###

name_func = [('repeat_product',                                                 
              repeat_product),                                                  
             ('dstack_product',                                                 
              dstack_product),                                                  
             ('cartesian_product',                                              
              cartesian_product),                                               
             ('cartesian_product_transpose',                                    
              cartesian_product_transpose),                                     
             ('cartesian_product_recursive',                           
              cartesian_product_recursive),                            
             ('cartesian_product_itertools',                                    
              cartesian_product_itertools)]

def test(in_arrays, test_funcs):
    global func
    global arrays
    arrays = in_arrays
    for name, func in test_funcs:
        print('{}:'.format(name))
        %timeit func(*arrays)

def test_all(*in_arrays):
    test(in_arrays, name_func)

# `cartesian_product_recursive` throws an 
# unexpected error when used on more than
# two input arrays, so for now I've removed
# it from these tests.

def test_cartesian(*in_arrays):
    test(in_arrays, name_func[2:4] + name_func[-1:])

x10 = [numpy.arange(10)]
x50 = [numpy.arange(50)]
x100 = [numpy.arange(100)]
x500 = [numpy.arange(500)]
x1000 = [numpy.arange(1000)]

测试结果:
In [2]: test_all(*(x100 * 2))
repeat_product:
67.5 µs ± 633 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
dstack_product:
67.7 µs ± 1.09 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
cartesian_product:
33.4 µs ± 558 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
cartesian_product_transpose:
67.7 µs ± 932 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
cartesian_product_recursive:
215 µs ± 6.01 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
cartesian_product_itertools:
3.65 ms ± 38.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [3]: test_all(*(x500 * 2))
repeat_product:
1.31 ms ± 9.28 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
dstack_product:
1.27 ms ± 7.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
cartesian_product:
375 µs ± 4.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
cartesian_product_transpose:
488 µs ± 8.88 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
cartesian_product_recursive:
2.21 ms ± 38.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_itertools:
105 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [4]: test_all(*(x1000 * 2))
repeat_product:
10.2 ms ± 132 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
dstack_product:
12 ms ± 120 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product:
4.75 ms ± 57.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_transpose:
7.76 ms ± 52.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_recursive:
13 ms ± 209 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_itertools:
422 ms ± 7.77 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

在所有情况下,本答案开头定义的cartesian_product是最快的。
对于那些接受任意数量输入数组的函数,当len(arrays) > 2时,检查性能是值得的。(在我确定为什么cartesian_product_recursive在这种情况下会抛出错误之前,我已从这些测试中删除它。)
In [5]: test_cartesian(*(x100 * 3))
cartesian_product:
8.8 ms ± 138 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_transpose:
7.87 ms ± 91.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_itertools:
518 ms ± 5.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [6]: test_cartesian(*(x50 * 4))
cartesian_product:
169 ms ± 5.1 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
cartesian_product_transpose:
184 ms ± 4.32 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
cartesian_product_itertools:
3.69 s ± 73.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [7]: test_cartesian(*(x10 * 6))
cartesian_product:
26.5 ms ± 449 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
cartesian_product_transpose:
16 ms ± 133 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
cartesian_product_itertools:
728 ms ± 16 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [8]: test_cartesian(*(x10 * 7))
cartesian_product:
650 ms ± 8.14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
cartesian_product_transpose:
518 ms ± 7.09 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
cartesian_product_itertools:
8.13 s ± 122 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

这些测试表明,在输入数组数量超过(大约)四个之前,cartesian_product 保持竞争力。在此之后,cartesian_product_transpose 稍微占优势。
值得重申的是,使用其他硬件和操作系统的用户可能会看到不同的结果。例如,unutbu报告在Ubuntu 14.04、Python 3.4.3和numpy 1.14.0.dev0+b7050a9下进行这些测试时看到以下结果:
>>> %timeit cartesian_product_transpose(x500, y500) 
1000 loops, best of 3: 682 µs per loop
>>> %timeit cartesian_product(x500, y500)
1000 loops, best of 3: 1.55 ms per loop

以下是我之前进行的一些测试的细节。这些方法的相对性能随着时间、硬件和Python及numpy版本的不同而发生了变化。虽然对于使用最新版本numpy的人来说并不立即有用,但它说明了自第一个答案版本以来事情发生了怎样的变化。
一个简单的替代方案:meshgrid + dstack
目前被接受的答案使用tile和repeat将两个数组广播在一起。但meshgrid函数实际上也可以做到几乎相同的事情。以下是传递给转置函数之前tile和repeat的输出:
In [1]: import numpy
In [2]: x = numpy.array([1,2,3])
   ...: y = numpy.array([4,5])
   ...: 

In [3]: [numpy.tile(x, len(y)), numpy.repeat(y, len(x))]
Out[3]: [array([1, 2, 3, 1, 2, 3]), array([4, 4, 4, 5, 5, 5])]

这里是meshgrid的输出结果:

In [4]: numpy.meshgrid(x, y)
Out[4]: 
[array([[1, 2, 3],
        [1, 2, 3]]), array([[4, 4, 4],
        [5, 5, 5]])]

如您所见,它们几乎完全相同。我们只需要重新塑造结果即可获得完全相同的结果。

In [5]: xt, xr = numpy.meshgrid(x, y)
   ...: [xt.ravel(), xr.ravel()]
Out[5]: [array([1, 2, 3, 1, 2, 3]), array([4, 4, 4, 5, 5, 5])]

不过,我们可以先将 meshgrid 的输出传递给 dstack,然后再进行重塑,从而节省一些工作,而不是在此时进行重塑:

In [6]: numpy.dstack(numpy.meshgrid(x, y)).reshape(-1, 2)
Out[6]: 
array([[1, 4],
       [2, 4],
       [3, 4],
       [1, 5],
       [2, 5],
       [3, 5]])

this comment中的说法相反,我没有看到不同的输入会产生形状不同的输出的证据。如上所示,它们执行非常相似的操作,因此如果确实存在这样的情况,就会非常奇怪。如果您找到反例,请告诉我。

测试meshgrid + dstackrepeat + transpose的性能差异

这两种方法的相对性能随时间而变化。在早期的Python版本(2.7)中,对于小输入,使用meshgrid+dstack的结果明显更快。(请注意,这些测试来自答案的旧版本。)定义:

>>> def repeat_product(x, y):
...     return numpy.transpose([numpy.tile(x, len(y)), 
                                numpy.repeat(y, len(x))])
...
>>> def dstack_product(x, y):
...     return numpy.dstack(numpy.meshgrid(x, y)).reshape(-1, 2)
...     

对于中等规模的输入,我看到了显著的加速。但是我用更新版本的Python(3.6.1)和numpy(1.12.1)在一台新的机器上重新运行了这些测试。现在这两种方法几乎相同。
旧测试
>>> x, y = numpy.arange(500), numpy.arange(500)
>>> %timeit repeat_product(x, y)
10 loops, best of 3: 62 ms per loop
>>> %timeit dstack_product(x, y)
100 loops, best of 3: 12.2 ms per loop

新测试

In [7]: x, y = numpy.arange(500), numpy.arange(500)
In [8]: %timeit repeat_product(x, y)
1.32 ms ± 24.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [9]: %timeit dstack_product(x, y)
1.26 ms ± 8.47 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

作为一直以来的情况,具体情况各有不同,但这表明在最近的Python和numpy版本中,它们是可以互换使用的。

广义乘积函数

通常情况下,我们可能期望对于小输入使用内置函数会更快,而对于大输入,一个专门设计的函数可能更快。此外,对于广义的n维乘积,tile和repeat都没有明显的高维类比。因此值得研究专门设计函数的行为。
大多数相关测试都在本答案开头出现,但以下仍然列出了一些早期Python和numpy版本进行比较的测试。
another answer中定义的cartesian函数曾经在处理更大的输入时表现良好。(它与上面称为cartesian_product_recursive的函数相同。)为了将cartesian与dstack_product进行比较,我们只使用两个维度。
在这里,旧测试显示出明显的差异,而新测试几乎没有显示出任何差异。 旧测试
>>> x, y = numpy.arange(1000), numpy.arange(1000)
>>> %timeit cartesian([x, y])
10 loops, best of 3: 25.4 ms per loop
>>> %timeit dstack_product(x, y)
10 loops, best of 3: 66.6 ms per loop

新测试

In [10]: x, y = numpy.arange(1000), numpy.arange(1000)
In [11]: %timeit cartesian([x, y])
12.1 ms ± 199 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [12]: %timeit dstack_product(x, y)
12.7 ms ± 334 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

与以前一样,dstack_product 在较小的尺度下仍然优于 cartesian新测试不显示冗余旧测试
In [13]: x, y = numpy.arange(100), numpy.arange(100)
In [14]: %timeit cartesian([x, y])
215 µs ± 4.75 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [15]: %timeit dstack_product(x, y)
65.7 µs ± 1.15 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

我认为这些区分很有意思,值得记录;但最终它们是学术性的。正如本答案开头所示的测试显示的那样,所有这些版本几乎总是比本答案开头定义的cartesian_product函数慢 -- 而cartesian_product本身略慢于该问题答案中最快的实现。


1
arr = np.empty()中添加dtype=object可以允许在积中使用不同的类型,例如:arrays = [np.array([1,2,3]), ['str1', 'str2']] - user3820991
非常感谢您提供的创新解决方案。只是想让您知道,根据用户的机器操作系统、Python或NumPy版本,一些用户可能会发现cartesian_product_transposecartesian_product更快。例如,在Ubuntu 14.04,python3.4.3,numpy 1.14.0.dev0+b7050a9上,%timeit cartesian_product_transpose(x500,y500)产生1000 loops, best of 3: 682 µs per loop,而%timeit cartesian_product(x500,y500)产生1000 loops, best of 3: 1.55 ms per loop。我还发现在len(arrays) > 2时,cartesian_product_transpose可能会更快。 - unutbu
此外,cartesian_product 返回一个浮点数数组,而 cartesian_product_transpose 返回与第一个(广播)数组相同的 dtype 的数组。在使用整数数组时保留 dtype 的能力可能是用户偏爱 cartesian_product_transpose 的原因之一。 - unutbu
@unutbu 再次感谢 - 正如我应该知道的那样,克隆dtype不仅方便,有时还可以使代码的速度提高20-30%。 - senderle
1
@senderle:哇,太棒了!而且,我突然想到可以使用类似于dtype = np.find_common_type([arr.dtype for arr in arrays], [])的方法来查找所有数组的公共数据类型,而不是强制用户将控制数据类型的数组放在第一位。 - unutbu
显示剩余2条评论

124

1
这种方法的优点是对于相同大小的数组产生一致的输出。meshgrid + dstack方法在某些情况下更快,但如果您期望以相同顺序构建相同大小的数组的笛卡尔积,则可能会导致错误。 - tlnagy
3
@tlnagy,我还没有注意到这种方法产生不同于meshgrid + dstack的结果的情况。你能否提供一个例子? - senderle

66
你可以在Python中使用普通的列表推导式。
x = numpy.array([1,2,3])
y = numpy.array([4,5])
[[x0, y0] for x0 in x for y0 in y]

这应该会给你

[[1, 4], [1, 5], [2, 4], [2, 5], [3, 4], [3, 5]]

完美!除了它会产生一个长度为nm的一维数组,而不是一个n乘m的二维数组。但这很有用。例如,您可以将[x0,y0]更改为x0y0,这可以用于将两个1d分布(在2d图上绘制为曲线)相乘以获得2d分布(在3d图上绘制为曲面)。就像这里将两个1d二项式分布相乘以获得2d多元二项式分布一样:https://upload.wikimedia.org/wikipedia/commons/8/8e/MultivariateNormal.png - Kukuster
3
天啊!如果你需要一个长度为n乘m的二维数组,只需在一个单独的推导式中包装一个循环:不要使用[(x0, y0) for x0 in x for y0 in y],而是使用[[(x0, y0) for x0 in x] for y0 in y] - Kukuster

49

我也对此很感兴趣,并进行了一些性能比较,可能比@senderle的答案更清晰。

对于两个数组(经典情况):

enter image description here

对于四个数组:

enter image description here

(请注意,这里数组的长度只有几十个条目。)


重现绘图的代码:

from functools import reduce
import itertools
import numpy
import perfplot


def dstack_product(arrays):
    return numpy.dstack(numpy.meshgrid(*arrays, indexing="ij")).reshape(-1, len(arrays))


# Generalized N-dimensional products
def cartesian_product(arrays):
    la = len(arrays)
    dtype = numpy.find_common_type([a.dtype for a in arrays], [])
    arr = numpy.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[..., i] = a
    return arr.reshape(-1, la)


def cartesian_product_transpose(arrays):
    broadcastable = numpy.ix_(*arrays)
    broadcasted = numpy.broadcast_arrays(*broadcastable)
    rows, cols = reduce(numpy.multiply, broadcasted[0].shape), len(broadcasted)
    dtype = numpy.find_common_type([a.dtype for a in arrays], [])

    out = numpy.empty(rows * cols, dtype=dtype)
    start, end = 0, rows
    for a in broadcasted:
        out[start:end] = a.reshape(-1)
        start, end = end, end + rows
    return out.reshape(cols, rows).T


# from https://dev59.com/V3M_5IYBdhLWcg3w1G6N#1235363
def cartesian_product_recursive(arrays, out=None):
    arrays = [numpy.asarray(x) for x in arrays]
    dtype = arrays[0].dtype

    n = numpy.prod([x.size for x in arrays])
    if out is None:
        out = numpy.zeros([n, len(arrays)], dtype=dtype)

    m = n // arrays[0].size
    out[:, 0] = numpy.repeat(arrays[0], m)
    if arrays[1:]:
        cartesian_product_recursive(arrays[1:], out=out[0:m, 1:])
        for j in range(1, arrays[0].size):
            out[j * m : (j + 1) * m, 1:] = out[0:m, 1:]
    return out


def cartesian_product_itertools(arrays):
    return numpy.array(list(itertools.product(*arrays)))


perfplot.show(
    setup=lambda n: 2 * (numpy.arange(n, dtype=float),),
    n_range=[2 ** k for k in range(13)],
    # setup=lambda n: 4 * (numpy.arange(n, dtype=float),),
    # n_range=[2 ** k for k in range(6)],
    kernels=[
        dstack_product,
        cartesian_product,
        cartesian_product_transpose,
        cartesian_product_recursive,
        cartesian_product_itertools,
    ],
    logx=True,
    logy=True,
    xlabel="len(a), len(b)",
    equality_check=None,
)

26
在 @senderle 的杰出工作基础上,我提出了两个版本——一个用于 C 布局,一个用于 Fortran 布局——通常会更快一些。
  • cartesian_product_transpose_pp 是一种使用更有利的转置内存布局和一些非常小的优化的 cartesion_product 版本,与 @senderle 的 cartesian_product_transpose 使用完全不同的策略。
  • cartesian_product_pp 保持原始内存布局。使它变快的是它使用连续复制。连续复制比仅复制有效位更快,因此即使只有部分数据包含有效数据,复制整个内存块也比仅复制有效位更好。

一些性能图表。我为 C 和 Fortran 布局分别制作了不同的图表,因为这些在我看来是不同的任务。

以 'pp' 结尾的名称是我的方法。

1) 许多微小因素(每个因素都有 2 个元素)

enter image description hereenter image description here

2) 许多小因素(每个4个元素)

enter image description hereenter image description here

3) 三个长度相等的因素

enter image description hereenter image description here

4) 长度相等的两个因素

enter image description hereenter image description here

代码(需要为每个图进行单独运行,因为我无法弄清如何重置;还需要适当编辑/注释):

import numpy
import numpy as np
from functools import reduce
import itertools
import timeit
import perfplot

def dstack_product(arrays):
    return numpy.dstack(
        numpy.meshgrid(*arrays, indexing='ij')
        ).reshape(-1, len(arrays))

def cartesian_product_transpose_pp(arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty((la, *map(len, arrays)), dtype=dtype)
    idx = slice(None), *itertools.repeat(None, la)
    for i, a in enumerate(arrays):
        arr[i, ...] = a[idx[:la-i]]
    return arr.reshape(la, -1).T

def cartesian_product(arrays):
    la = len(arrays)
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(numpy.ix_(*arrays)):
        arr[...,i] = a
    return arr.reshape(-1, la)

def cartesian_product_transpose(arrays):
    broadcastable = numpy.ix_(*arrays)
    broadcasted = numpy.broadcast_arrays(*broadcastable)
    rows, cols = numpy.prod(broadcasted[0].shape), len(broadcasted)
    dtype = numpy.result_type(*arrays)

    out = numpy.empty(rows * cols, dtype=dtype)
    start, end = 0, rows
    for a in broadcasted:
        out[start:end] = a.reshape(-1)
        start, end = end, end + rows
    return out.reshape(cols, rows).T

from itertools import accumulate, repeat, chain

def cartesian_product_pp(arrays, out=None):
    la = len(arrays)
    L = *map(len, arrays), la
    dtype = numpy.result_type(*arrays)
    arr = numpy.empty(L, dtype=dtype)
    arrs = *accumulate(chain((arr,), repeat(0, la-1)), np.ndarray.__getitem__),
    idx = slice(None), *itertools.repeat(None, la-1)
    for i in range(la-1, 0, -1):
        arrs[i][..., i] = arrays[i][idx[:la-i]]
        arrs[i-1][1:] = arrs[i]
    arr[..., 0] = arrays[0][idx]
    return arr.reshape(-1, la)

def cartesian_product_itertools(arrays):
    return numpy.array(list(itertools.product(*arrays)))


# from https://dev59.com/V3M_5IYBdhLWcg3w1G6N#1235363
def cartesian_product_recursive(arrays, out=None):
    arrays = [numpy.asarray(x) for x in arrays]
    dtype = arrays[0].dtype

    n = numpy.prod([x.size for x in arrays])
    if out is None:
        out = numpy.zeros([n, len(arrays)], dtype=dtype)

    m = n // arrays[0].size
    out[:, 0] = numpy.repeat(arrays[0], m)
    if arrays[1:]:
        cartesian_product_recursive(arrays[1:], out=out[0:m, 1:])
        for j in range(1, arrays[0].size):
            out[j*m:(j+1)*m, 1:] = out[0:m, 1:]
    return out

### Test code ###
if False:
  perfplot.save('cp_4el_high.png',
    setup=lambda n: n*(numpy.arange(4, dtype=float),),
                n_range=list(range(6, 11)),
    kernels=[
        dstack_product,
        cartesian_product_recursive,
        cartesian_product,
#        cartesian_product_transpose,
        cartesian_product_pp,
#        cartesian_product_transpose_pp,
        ],
    logx=False,
    logy=True,
    xlabel='#factors',
    equality_check=None
    )
else:
  perfplot.save('cp_2f_T.png',
    setup=lambda n: 2*(numpy.arange(n, dtype=float),),
    n_range=[2**k for k in range(5, 11)],
    kernels=[
#        dstack_product,
#        cartesian_product_recursive,
#        cartesian_product,
        cartesian_product_transpose,
#        cartesian_product_pp,
        cartesian_product_transpose_pp,
        ],
    logx=True,
    logy=True,
    xlabel='length of each factor',
    equality_check=None
    )

1
感谢您分享这个优秀的答案。当cartesian_product_transpose_pp(arrays)中的arrays大小超过一定值时,将会出现MemoryError。在这种情况下,我希望该函数能够产生更小的结果块。我在问题上发布了一个相关的问题。您能回答我的问题吗?谢谢。 - Sun Bear
当 dtype 为 object 且因子长度不同时,它是否有效? - amarchin

23

从2017年10月份开始,numpy现在有了一个通用的np.stack函数,该函数带有一个轴参数。使用它,我们可以使用“dstack和meshgrid”技术来获得“广义笛卡尔积”:

import numpy as np

def cartesian_product(*arrays):
    ndim = len(arrays)
    return (np.stack(np.meshgrid(*arrays), axis=-1)
              .reshape(-1, ndim))

a = np.array([1,2])
b = np.array([10,20])
cartesian_product(a,b)

# output:
# array([[ 1, 10],
#        [ 2, 10],
#        [ 1, 20],
#        [ 2, 20]])  

关于axis=-1参数的说明。这是结果中的最后一个(内部)轴。它等价于使用axis=ndim

另外要注意的一点是,由于笛卡尔积很快就会爆炸增长,除非出于某种原因我们需要在内存中实现数组,否则如果积非常大,我们可能需要利用itertools并即时使用值。


12

Scikit-learn包中快速实现了这一点:

from sklearn.utils.extmath import cartesian
product = cartesian((x,y))

请注意,如果您关心输出的顺序,则此实现的惯例与您所希望的不同。 若要进行精确排序,请执行以下操作
product = cartesian((y,x))[:, ::-1]

这个比 @senderle 的函数更快吗? - cs95
2
@cᴏʟᴅsᴘᴇᴇᴅ 我还没有测试过。我原本希望这是用C或Fortran实现的,因此几乎无法被击败,但是看起来它是使用NumPy编写的。因此,这个函数很方便,但应该不会比自己使用NumPy构造的函数快得多。 - jmd_dk

8

我曾经使用过@kennytm的答案,但当我尝试在TensorFlow中执行相同操作时,我发现TensorFlow没有numpy.repeat()的等效函数。经过一番试验,我认为我找到了一个更通用的解决方案,适用于任意点向量。

对于NumPy:

import numpy as np

def cartesian_product(*args: np.ndarray) -> np.ndarray:
    """
    Produce the cartesian product of arbitrary length vectors.

    Parameters
    ----------
    np.ndarray args
        vector of points of interest in each dimension

    Returns
    -------
    np.ndarray
        the cartesian product of size [m x n] wherein:
            m = prod([len(a) for a in args])
            n = len(args)
    """
    for i, a in enumerate(args):
        assert a.ndim == 1, "arg {:d} is not rank 1".format(i)
    return np.concatenate([np.reshape(xi, [-1, 1]) for xi in np.meshgrid(*args)], axis=1)

而对于TensorFlow:

import tensorflow as tf

def cartesian_product(*args: tf.Tensor) -> tf.Tensor:
    """
    Produce the cartesian product of arbitrary length vectors.

    Parameters
    ----------
    tf.Tensor args
        vector of points of interest in each dimension

    Returns
    -------
    tf.Tensor
        the cartesian product of size [m x n] wherein:
            m = prod([len(a) for a in args])
            n = len(args)
    """
    for i, a in enumerate(args):
        tf.assert_rank(a, 1, message="arg {:d} is not rank 1".format(i))
    return tf.concat([tf.reshape(xi, [-1, 1]) for xi in tf.meshgrid(*args)], axis=1)

4
更一般地说,如果你有两个2d的numpy数组a和b,并且你想要将a的每一行与b的每一行连接起来(行的笛卡尔积,类似于数据库中的join),你可以使用以下方法:
import numpy
def join_2d(a, b):
    assert a.dtype == b.dtype
    a_part = numpy.tile(a, (len(b), 1))
    b_part = numpy.repeat(b, len(a), axis=0)
    return numpy.hstack((a_part, b_part))

3
你能够通过将生成器表达式与map函数结合起来来实现最快的速度:
import numpy
import datetime
a = np.arange(1000)
b = np.arange(200)

start = datetime.datetime.now()

foo = (item for sublist in [list(map(lambda x: (x,i),a)) for i in b] for item in sublist)

print (list(foo))

print ('execution time: {} s'.format((datetime.datetime.now() - start).total_seconds()))

输出结果(实际上是整个结果列表被打印出来):

[(0, 0), (1, 0), ...,(998, 199), (999, 199)]
execution time: 1.253567 s

或者使用双重生成器表达式:

a = np.arange(1000)
b = np.arange(200)

start = datetime.datetime.now()

foo = ((x,y) for x in a for y in b)

print (list(foo))

print ('execution time: {} s'.format((datetime.datetime.now() - start).total_seconds()))

输出(完整列表打印):

[(0, 0), (1, 0), ...,(998, 199), (999, 199)]
execution time: 1.187415 s

请注意,大部分计算时间用于打印命令。生成器的计算效率还是不错的。在不打印计算时间的情况下为:

execution time: 0.079208 s

对于生成器表达式 + map 函数:

execution time: 0.007093 s

针对双生成器表达式的翻译。

如果您实际想要计算每个坐标对的实际乘积,最快的方法是将其解决为numpy矩阵乘积:

a = np.arange(1000)
b = np.arange(200)

start = datetime.datetime.now()

foo = np.dot(np.asmatrix([[i,0] for i in a]), np.asmatrix([[i,0] for i in b]).T)

print (foo)

print ('execution time: {} s'.format((datetime.datetime.now() - start).total_seconds()))

输出:

 [[     0      0      0 ...,      0      0      0]
 [     0      1      2 ...,    197    198    199]
 [     0      2      4 ...,    394    396    398]
 ..., 
 [     0    997   1994 ..., 196409 197406 198403]
 [     0    998   1996 ..., 196606 197604 198602]
 [     0    999   1998 ..., 196803 197802 198801]]
execution time: 0.003869 s

而且不需要打印(在这种情况下,它并没有节省太多空间,因为实际上只有一个小部分的矩阵被打印出来):

execution time: 0.003083 s

对于产品计算,外部产品广播foo = a [:,None] * b更快。使用您的定时方法而不使用print(foo),它是0.001103s与0.002225s。使用timeit,它是304μs vs 1.6ms 。矩阵已知比ndarray慢,所以我尝试了np.array的代码,但仍然比广播慢(1.57ms)。 - syockit

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接