不同大小的数组的逐元素操作

3
什么是最快且最符合Python风格的方法,可以在不过采样较小数组的情况下执行不同大小数组的元素逐个操作?
例如: 我有一个大数组A 1000x1000和一个小数组B 10x10,我想让B中的每个元素响应于A中的100x100个元素。没有任何插值的必要,只需为所有10000次操作使用B中相同的元素。
我可以调整两个数组的大小,使A的形状成为B的倍数。在所有维度上它们通常是1:10000或1:1000。这些数组表示具有不同分辨率但相同范围的数据样本。
我知道可以通过使用Kronecker乘积等方式过采样数组B,但保持数组B小会更好,特别是因为我的某些数组处理和存储非常大。我使用xarray和dask,但任何numpy操作也都适用。
希望这段摘录能够解释我想做什么:
import numpy as np

A = np.random.rand(10,10)
B = np.random.rand(1000,1000)

res = np.shape(B)[0]//np.shape(A)[0]

#I want to add A and B so that each element in A is added to 100x100 elements in B.
#This doesn't work of obvious reasons:
#C = A+B

#This solution sacrifices the resolution of B:
C = A+B[::res,::res]

#These solutions creates an unnecessary large array for the operation(don't they?):
K = np.ones((res,res))
C = np.kron(A, K) + B

C = np.repeat(np.repeat(A,res, axis=0), res, axis=1)+B

我有一种感觉,这个问题以前可能已经出现过了,但我没有找到任何一个适用于这种特殊情况的答案。


2个回答

3

我曾遇到相似的问题,最终的解决方式如下:

import numpy as np
import numba as nb
import time

@nb.jit(nb.void(nb.float64[:,:], nb.float64[:,:]), nopython=True, cache=True)
def func(A, B):
    # Assume different resolution along the different axes
    res_row = B.shape[0]//A.shape[0]
    res_col = B.shape[1]//A.shape[1]

    for i in range(A.shape[0]):
        for j in range(A.shape[1]):
            # Example to add, can be changed to any other function
            B[i * res_row : (i+1) * res_row, j * res_col : (j+1) * res_col] += A[i,j]


a = np.random.rand(1000,1000)
b = np.random.rand(10000,10000)

t1 = time.time()
func(a,b)
print("Time taken = {time_taken}".format(time_taken = time.time() - t1))
# with jitting on 8GB, 8 cores MacBook Pro = 0.45
# without jitting = 5.46

虽然numba与这个问题无关,但这是我解决问题的方法,使用JIT编译可以显著提高性能。


1
这是一个很好的 JIT 使用示例,可以在许多情况下使用。谢谢! - user2821

3

通过广播,我们可以将一个小数组“复制”以便与一个更大的数组一起使用。例如,如果

a = np.random.rand(10)
b = np.random.rand(1000).reshape(10,100)
a[:,None]+b

这里的技巧是将A的每个元素与B的一个(100,100)块配对。为此,我们需要进行重塑和转置。
In [3]: A = np.random.rand(10,10)
   ...: B = np.random.rand(1000,1000)
   ...: res = np.shape(B)[0]//np.shape(A)[0]

您的目标:

In [4]: K = np.ones((res,res))
   ...: C = np.kron(A, K) + B
   ...: 
   ...: C = np.repeat(np.repeat(A,res, axis=0), res, axis=1)+B

In [5]: C.shape
Out[5]: (1000, 1000)

B 分成这些 (100,100) 的块:

In [7]: B1 = B.reshape(10,100,10,100).transpose(0,2,1,3)

In [8]: B1.shape
Out[8]: (10, 10, 100, 100)

现在我们可以使用广播进行添加。
In [9]: C1 = B1 + A[:,:,None,None]

In [10]: C1.shape
Out[10]: (10, 10, 100, 100)

将其重新塑造为原始的B形状:

In [11]: C1=C1.transpose(0,2,1,3).reshape(B.shape)

它们匹配:

In [12]: np.allclose(C,C1)
Out[12]: True

在我的笔记本电脑上,这种方法比使用jit的方法更快。 - PAb
简单易懂,像Python一样。谢谢,我没想到! - user2821

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接