Python:将一个循环的numpy数学函数重写为在GPU上运行

26

有人能帮我重写这个函数(doTheMath函数)以便在GPU上进行计算吗?我已经花了几天的时间试图理解它,但没有结果。我想知道是否有人可以帮我重写这个函数,任何你认为合适的方式都可以,只要最终给出相同的结果即可。我尝试使用numba@jit,但由于某种原因它实际上比通常运行代码还要慢得多。对于一个巨大的样本大小,目标是大大减少执行时间,所以自然我认为使用GPU是最快的方法。

我会稍微解释一下实际发生了什么。真实数据,看起来几乎与以下代码中创建的样本数据相同,被分成约500万行的样本大小或每个文件约150MB左右。总共有约6亿行或20GB的数据。我必须按样本循环遍历这些数据,然后在每个样本中逐行地取最后的2000行(或其他行数),并运行doTheMath函数,该函数返回结果。然后将该结果保存回硬盘,在另一个程序中可以执行其他操作。如下所示,我不需要所有行的所有结果,只需要大于特定金额的结果。如果我现在像python中一样运行我的函数,每100万行大约需要62秒。考虑到所有的数据和应该如何快速完成,这是一个非常长的时间。

我必须提到的是,我使用data = joblib.load(file)将真实数据以文件为单位上传到RAM中,因此上传数据不是问题,因为每个文件只需要约0.29秒钟的时间。上传后,我运行下面的整个代码。最耗时的是doTheMath函数。我愿意把我在stackoverflow上拥有的全部500声望点作为奖励,给愿意帮我重写这个简单代码以在GPU上运行的人。我的兴趣特别在于GPU,我真的想看看它是如何解决这个问题的。

编辑/更新1: 这里是真实数据的小样本链接:data_csv.zip 约102000行真实数据1和2000行真实数据2a和data2b。在真实样本数据上使用minimumLimit = 400

编辑/更新2: 对于那些关注这篇文章的人,以下是答案的简要总结。到目前为止,我们对原始解决方案有4个答案。由@Divakar提供的代码只是原始代码的微调。这两个微调中只有第一个实际上适用于此问题,第二个是一个好的微调,但在这里不适用。在其他三个答案中,有两个是基于CPU的解决方案,一个是基于tensorflow-GPU。Paul Panzer的Tensorflow-GPU看起来很有前途,但当我实际在GPU上运行它时,它比原来的还要慢,所以代码仍然需要改进。

另外两个基于CPU的解决方案由@PaulPanzer(纯numpy解决方案)和@MSeifert(numba解决方案)提交。这两个解决方案都给出了非常好的结果,并且与原始代码相比处理数据的速度非常快。其中,由Paul Panzer提交的那

import pandas as pd
import numpy as np
import time

def doTheMath(tmpData1, data2a, data2b):
    A = tmpData1[:, 0]
    B = tmpData1[:,1]
    C = tmpData1[:,2]
    D = tmpData1[:,3]
    Bmax = B.max()
    Cmin  = C.min()
    dif = (Bmax - Cmin)
    abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

#Declare variables
batchSize = 2000
sampleSize = 5000000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

#Create Random Sample Data
data1 = np.matrix(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #upper limit
data2b = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #lower limit
#approx. half of data2a will be smaller than data2b, but that is only in the sample data because it is randomly generated, NOT the real data. The real data2a is always higher than data2b.


#Loop through the data
t0 = time.time()
for rowNr in  range(data1.shape[0]):
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
    if(tmp_df.shape[0] == batchSize):
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result])
print('Runtime:', time.time() - t0)

#Save data results
resultArray = np.array(resultArray)
print(resultArray[:,1].sum())
resultArray = pd.DataFrame({'index':resultArray[:,0], 'result':resultArray[:,1]})
resultArray.to_csv("Result Array.csv", sep=';')

我正在处理的电脑配置:

GTX970(4gb) video card; 
i7-4790K CPU 4.00Ghz; 
16GB RAM;
a SSD drive 
running Windows 7; 
作为一个旁问,第二张支持SLI的显卡对于解决这个问题有帮助吗?

SLI与CUDA无关。如果想要将代码转换为CUDA,你需要坐在电脑前,手动输入新的CUDA内核代码。如果你想在两个GPU上运行它,你还需要输入API代码来管理在两个GPU上运行代码的过程。 - talonmies
你可以尝试使用numba,它可以尝试自动地使用CUDA。更好的方法是使用Theano/TensorFlow的计算图,在它们的框架内实现你的算法以编译为GPU可用的代码。但总的来说,重要的是了解CUDA,并使用像talonmies提到的可用工具自定义设计算法。 - sascha
感谢@sascha的建议。我之前认为Theano和Tensorflow只适用于机器学习问题。我现在会研究一下Numba。 - RaduS
1
我认为最大的改进之一是使用初始化的输出数组:resultArray,然后在每次迭代中索引到它以进行更新,而不是从空列表开始并使用缓慢的append - Divakar
感谢Divakar提出的调整建议。 - RaduS
显示剩余3条评论
5个回答

11

介绍和解决方案代码

好的,你提出了要求!因此,在本文中列出了一种使用PyCUDA实现的轻量级包装器,扩展了Python环境中大部分CUDA功能。我们将使用它的SourceModule功能,该功能允许我们在Python环境中编写和编译CUDA内核。

针对手头的问题,涉及到滑动最大值和最小值、少量差异和除法以及比较等计算。对于最大值和最小值部分,这涉及块最大值查找(对于每个滑动窗口),我们将使用减少技术,如这里详细讨论的那样。这将在块级别完成。对于跨滑动窗口的上层迭代,我们将使用网格级索引进入CUDA资源。有关此块和网格格式的更多信息,请参见第18页。 PyCUDA还支持用于计算最大值和最小值等约简的内置函数,但我们会失去控制,具体来说,我们打算使用专用内存(如共享内存和常量内存)来利用GPU接近最佳水平。

列出PyCUDA-NumPy解决方案代码 -

1] PyCUDA部分 -

import pycuda.autoinit
import pycuda.driver as drv
import numpy as np
from pycuda.compiler import SourceModule

mod = SourceModule("""
#define TBP 1024 // THREADS_PER_BLOCK

__device__ void get_Bmax_Cmin(float* out, float *d1, float *d2, int L, int offset)
{
    int tid = threadIdx.x;
    int inv = TBP;
    __shared__ float dS[TBP][2];

    dS[tid][0] = d1[tid+offset];  
    dS[tid][1] = d2[tid+offset];         
    __syncthreads();

    if(tid<L-TBP)  
    {
        dS[tid][0] = fmaxf(dS[tid][0] , d1[tid+inv+offset]);
        dS[tid][1] = fminf(dS[tid][1] , d2[tid+inv+offset]);
    }
    __syncthreads();
    inv = inv/2;

    while(inv!=0)   
    {
        if(tid<inv)
        {
            dS[tid][0] = fmaxf(dS[tid][0] , dS[tid+inv][0]);
            dS[tid][1] = fminf(dS[tid][1] , dS[tid+inv][1]);
        }
        __syncthreads();
        inv = inv/2;
    }
    __syncthreads();

    if(tid==0)
    {
        out[0] = dS[0][0];
        out[1] = dS[0][1];
    }   
    __syncthreads();
}

__global__ void main1(float* out, float *d0, float *d1, float *d2, float *d3, float *lowL, float *highL, int *BLOCKLEN)
{
    int L = BLOCKLEN[0];
    int tid = threadIdx.x;
    int iterID = blockIdx.x;
    float Bmax_Cmin[2];
    int inv;
    float Cmin, dif;   
    __shared__ float dS[TBP*2];   

    get_Bmax_Cmin(Bmax_Cmin, d1, d2, L, iterID);  
    Cmin = Bmax_Cmin[1];
    dif = (Bmax_Cmin[0] - Cmin);

    inv = TBP;

    dS[tid] = (d0[tid+iterID] + d1[tid+iterID] + d2[tid+iterID] + d3[tid+iterID] - 4.0*Cmin) / (4.0*dif);
    __syncthreads();

    if(tid<L-TBP)  
        dS[tid+inv] = (d0[tid+inv+iterID] + d1[tid+inv+iterID] + d2[tid+inv+iterID] + d3[tid+inv+iterID] - 4.0*Cmin) / (4.0*dif);                   

     dS[tid] = ((dS[tid] >= lowL[tid]) & (dS[tid] <= highL[tid])) ? 1 : 0;
     __syncthreads();

     if(tid<L-TBP)
         dS[tid] += ((dS[tid+inv] >= lowL[tid+inv]) & (dS[tid+inv] <= highL[tid+inv])) ? 1 : 0;
     __syncthreads();

    inv = inv/2;
    while(inv!=0)   
    {
        if(tid<inv)
            dS[tid] += dS[tid+inv];
        __syncthreads();
        inv = inv/2;
    }

    if(tid==0)
        out[iterID] = dS[0];
    __syncthreads();

}
""")

请注意,THREADS_PER_BLOCK, TBP应根据batchSize进行设置。经验法则是将2的幂次方值分配给TBP,该值略小于batchSize。因此,对于batchSize = 2000,我们需要将TBP设置为1024
2] NumPy部分 -
def gpu_app_v1(A, B, C, D, batchSize, minimumLimit):
    func1 = mod.get_function("main1")
    outlen = len(A)-batchSize+1

    # Set block and grid sizes
    BSZ = (1024,1,1)
    GSZ = (outlen,1)

    dest = np.zeros(outlen).astype(np.float32)
    N = np.int32(batchSize)
    func1(drv.Out(dest), drv.In(A), drv.In(B), drv.In(C), drv.In(D), \
                     drv.In(data2b), drv.In(data2a),\
                     drv.In(N), block=BSZ, grid=GSZ)
    idx = np.flatnonzero(dest >= minimumLimit)
    return idx, dest[idx]

基准测试

我已经在GTX 960M上进行了测试。请注意,PyCUDA希望数组是连续的顺序。因此,我们需要对列进行切片并制作副本。我期望/假设数据可以从文件中读取,使得数据沿行而不是列传播。因此,现在将它们保留在基准测试函数之外。

原始方法 -

def org_app(data1, batchSize, minimumLimit):
    resultArray = []
    for rowNr in  range(data1.shape[0]-batchSize+1):
        tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result]) 
    return resultArray

时间和验证 -

In [2]: #Declare variables
   ...: batchSize = 2000
   ...: sampleSize = 50000
   ...: resultArray = []
   ...: minimumLimit = 490 #use 400 on the real sample data
   ...: 
   ...: #Create Random Sample Data
   ...: data1 = np.random.uniform(1, 100000, (sampleSize + batchSize, 4)).astype(np.float32)
   ...: data2b = np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: data2a = data2b + np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: 
   ...: # Make column copies
   ...: A = data1[:,0].copy()
   ...: B = data1[:,1].copy()
   ...: C = data1[:,2].copy()
   ...: D = data1[:,3].copy()
   ...: 
   ...: gpu_out1,gpu_out2 = gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
   ...: cpu_out1,cpu_out2 = np.array(org_app(data1, batchSize, minimumLimit)).T
   ...: print(np.allclose(gpu_out1, cpu_out1))
   ...: print(np.allclose(gpu_out2, cpu_out2))
   ...: 
True
False

所以,CPU和GPU计算之间存在一些差异。让我们来研究一下它们 -

In [7]: idx = np.flatnonzero(~np.isclose(gpu_out2, cpu_out2))

In [8]: idx
Out[8]: array([12776, 15208, 17620, 18326])

In [9]: gpu_out2[idx] - cpu_out2[idx]
Out[9]: array([-1., -1.,  1.,  1.])

有四个实例的计数不匹配,最大偏差为1。经过研究,我发现了一些信息。基本上,由于我们使用数学内在函数进行最大值和最小值计算,我认为这些函数导致浮点表示中的最后一个二进制位与CPU不同。这被称为ULP误差,并已在这里这里详细讨论。

最后,抛开这个问题,让我们来关注最重要的部分,性能 -

In [10]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 2.18 s per loop

In [11]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
10 loops, best of 3: 82.5 ms per loop

In [12]: 2180.0/82.5
Out[12]: 26.424242424242426

让我们尝试更大的数据集。当 sampleSize = 500000 时,我们得到 -

In [14]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 23.2 s per loop

In [15]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
1 loops, best of 3: 821 ms per loop

In [16]: 23200.0/821
Out[16]: 28.25822168087698

所以,加速比保持在27左右。

限制:

1)我们使用float32数字,因为GPU最擅长处理这些数字。双精度特别是在非服务器GPU上并不流行,而且由于您正在使用这样的GPU,我进行了float32测试。

进一步改进:

1)我们可以使用更快的常量内存来提供data2adata2b,而不是使用全局内存


1
@RaduS 一定要查看已编辑的代码(刚刚编辑)以进行基准测试!现在它接受任意的 batchSize - Divakar
1
@RaduS 当然,我会在今晚之前完成 :) - Divakar
1
@RaduS已删除“澄清#1:值不匹配问题”的部分,因为问题似乎只是错误的添加部分 :) - Divakar
1
@RaduS 1,2,3,Boom!:D 啊,GPU 真是神奇!我之前正在学习 CUDA,通过你的悬赏,让我有了重新开始的动力,所以感谢你!还有很多东西要学呢。 - Divakar
1
@Divakar,我来恭喜你了!我本来还想再调整一下我的代码,但是你的实在是太棒了。 - Paul Panzer
显示剩余18条评论

10

微调 #1

通常建议在使用NumPy数组时对其进行向量化,但是对于非常大的数组,向量化可能无法实现。因此,为了提高性能,可以通过微调优化求数组和的最后一步。

我们可以替换制作1s0s数组并执行求和的步骤:

np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

使用 np.count_nonzero 可以高效地统计布尔数组中的 True 值数量,而无需将其转换为 1s0s

np.count_nonzero((abcd <= data2a) & (abcd >= data2b))

运行时测试 -

In [45]: abcd = np.random.randint(11,99,(10000))

In [46]: data2a = np.random.randint(11,99,(10000))

In [47]: data2b = np.random.randint(11,99,(10000))

In [48]: %timeit np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
10000 loops, best of 3: 81.8 µs per loop

In [49]: %timeit np.count_nonzero((abcd <= data2a) & (abcd >= data2b))
10000 loops, best of 3: 28.8 µs per loop

调整 #2

在处理隐式广播的情况时,使用预先计算的倒数。有更多信息,请参见此处。因此,在以下步骤中存储dif的倒数并使用它代替:

((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ...

样例测试 -

In [52]: A = np.random.rand(10000)

In [53]: dif = 0.5

In [54]: %timeit A/dif
10000 loops, best of 3: 25.8 µs per loop

In [55]: %timeit A*(1.0/dif)
100000 loops, best of 3: 7.94 µs per loop

您有四个地方使用了 dif 进行除法计算。因此,希望这也能在那些地方带来明显的提升!


嗨@Divakar,关于调整#2,我阅读了您提供的链接并尝试实施它。但似乎我没有得到相同的结果。也许我做错了什么。你能看一下吗?也许你更容易发现错误dif = 1.0 /(Bmax-Cmin)然后abcd =((dif * A)+((dif * B)+(dif * C)+(dif * D))/ 4) - RaduS
@RaduS 如果 BmaxCmin 接近,那么 Bmax - Cmin 将是一个小数,它的倒数将是一个大数。因此,当数组乘以该数字时,我们将得到不同的数字。所以,我们可能会跳过这个微调。 - Divakar

5

在开始调整目标(GPU)或使用其他内容(即并行执行)之前,您可能需要考虑如何改进已有的代码。您使用了标记,因此我将使用它来改进代码:首先,我们操作数组而不是矩阵:

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

每次调用 doTheMath,您都希望返回一个整数,但是您使用了很多数组并创建了许多中间数组:
abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

这将在每个步骤中创建一个中间数组:

  • tmp1 = A-Cmin,
  • tmp2 = tmp1 / dif,
  • tmp3 = B - Cmin,
  • tmp4 = tmp3 / dif
  • ... 你懂的。

然而,这是一个减少函数(数组 -> 整数),因此拥有许多中间数组是不必要的负担,只需计算“飞行”的值即可。

import numba as nb

@nb.njit
def doTheMathNumba(tmpData, data2a, data2b):
    Bmax = np.max(tmpData[:, 1])
    Cmin = np.min(tmpData[:, 2])
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    sum_ = 0
    for i in range(tmpData.shape[0]):
        val = (tmpData[i, 0] + tmpData[i, 1] + tmpData[i, 2] + tmpData[i, 3]) / 4 * idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_

我在这里做了其他事情以避免多次操作:

(((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4
= ((A - Cmin + B - Cmin + C - Cmin + D - Cmin) / dif) / 4
= (A + B + C + D - 4 * Cmin) / (4 * dif)
= (A + B + C + D) / (4 * dif) - (Cmin / dif)

这实际上将我的计算机执行时间缩短了近10倍:
%timeit doTheMath(tmp_df, data2a, data2b)       # 1000 loops, best of 3: 446 µs per loop
%timeit doTheMathNumba(tmp_df, data2a, data2b)  # 10000 loops, best of 3: 59 µs per loop

当然还有其他的改进,例如使用滚动最小/最大值来计算BmaxCmin,这将使至少部分计算运行在O(sampleSize)而不是O(samplesize * batchsize)。这也将使得在下一个样本中可以重用一些 (A + B + C + D) / (4 * dif) - (Cmin / dif) 的计算,因为如果 CminBmax 不变,则这些值不会有所不同。虽然比较有些复杂,但肯定是可能的!请参见:此处

import time
import numpy as np
import numba as nb

@nb.njit
def doTheMathNumba(abcd, data2a, data2b, Bmax, Cmin):
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    quarter_idiff = 0.25 * idiff
    sum_ = 0
    for i in range(abcd.shape[0]):
        val = abcd[i] * quarter_idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_

@nb.njit
def doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, resultArray):
    found = 0
    for rowNr in range(data1.shape[0]):
        if(abcd[rowNr:rowNr + batchSize].shape[0] == batchSize):
            result = doTheMathNumba(abcd[rowNr:rowNr + batchSize], 
                                    data2a, data2b, Bmaxs[rowNr], Cmins[rowNr])
            if (result >= minimumLimit):
                resultArray[found, 0] = rowNr
                resultArray[found, 1] = result
                found += 1
    return resultArray[:found]

#Declare variables
batchSize = 2000
sampleSize = 50000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

from scipy import ndimage
t0 = time.time()
abcd = np.sum(data1, axis=1)
Bmaxs = ndimage.maximum_filter1d(data1[:, 1], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))  # correction for even shapes
Cmins = ndimage.minimum_filter1d(data1[:, 2], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))

result = np.zeros((sampleSize, 2), dtype=np.int64)
doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, result)
print('Runtime:', time.time() - t0)

这给了我一个运行时: 0.759593152999878(在numba编译函数之后!),而您的原始运行时为运行时:24.68975639343262。现在我们快了30倍!

在您的样本大小下,仍需运行时: 60.187848806381226,但这还不错,是吗?

即使我没有亲自做过这个,说可以编写"Numba for CUDA GPUs",并且它似乎并不太复杂。

这也将使得重复使用一些 (A + B + C + D) / (4 * dif) - (Cmin / dif) 的计算成为可能,因为如果 Cmin 和 Bmax 在下一个样本中不变,这些值就不会有所不同。这有点复杂...已经完成了,马上发布。它很快,我正在使用纯 numpy。 - Paul Panzer
好的,我必须更正我的前面说法,因为我做错了什么,它只快了30倍 :( - MSeifert
@PaulPanzer 是的,可以重新实现所有这些功能(而不是使用scipy过滤器),但我认为你的numpy代码相当复杂,并且在我的电脑上也更慢(不多,但几乎慢了2倍)。因此,我认为在这里“使用纯numpy”并没有优势。另外:Numba实际上可以编译GPU代码,尽管我自己还没有这样做。 :) - MSeifert
@RaduS 我不小心复制了带有 modecval 的尝试过滤器。我编辑了答案,现在应该可以工作了。 :) - MSeifert
scipy拥有scipy.ndimage.maximum_filter1d函数。 - user7138814
显示剩余7条评论

4

以下是一些演示代码,展示通过调整算法可以实现什么。这是纯numpy代码,在你发布的样本数据上比原始版本快了大约35倍(在我的相对较低配置电脑上,处理大约1,000,000个样本只需要约2.5秒):

>>> result_dict = master('run')
[('load', 0.82578349113464355), ('precomp', 0.028138399124145508), ('max/min', 0.24333405494689941), ('ABCD', 0.015314102172851562), ('main', 1.3356468677520752)]
TOTAL 2.44821691513

使用的修改:

  • A+B+C+D,详见我的其他答案
  • 运行min/max,包括避免使用相同的Cmin/dif多次计算(A+B+C+D-4Cmin)/(4dif)。

这些修改更多地是例行操作。这就留下了与数据2a/b的比较,这需要O(NK)的昂贵成本,其中N是样本数量,K是窗口大小。 在这里,我们可以利用相对良好的数据。使用running min/max,可以创建data2a/b的变体,可以用于一次测试一系列窗口偏移量,如果测试失败,则可以立即排除所有这些偏移量,否则该范围被二分。

import numpy as np
import time

# global variables; they will hold the precomputed pre-screening filters
preA, preB = {}, {}
CHUNK_SIZES = None

def sliding_argmax(data, K=2000):
    """compute the argmax of data over a sliding window of width K

    returns:
        indices  -- indices into data
        switches -- window offsets at which the maximum changes
                    (strictly speaking: where the index of the maximum changes)
                    excludes 0 but includes maximum offset (len(data)-K+1)

    see last line of compute_pre_screening_filter for a recipe to convert
    this representation to the vector of maxima
    """
    N = len(data)
    last = np.argmax(data[:K])
    indices = [last]
    while indices[-1] <= N - 1:
        ge = np.where(data[last + 1 : last + K + 1] > data[last])[0]
        if len(ge) == 0:
            if last + K >= N:
                break
            last += 1 + np.argmax(data[last + 1 : last + K + 1])
            indices.append(last)
        else:
            last += 1 + ge[0]
            indices.append(last)
    indices = np.array(indices)
    switches = np.where(data[indices[1:]] > data[indices[:-1]],
                        indices[1:] + (1-K), indices[:-1] + 1)
    return indices, np.r_[switches, [len(data)-K+1]]


def compute_pre_screening_filter(bound, n_offs):
    """compute pre-screening filter for point-wise upper bound

    given a K-vector of upper bounds B and K+n_offs-1-vector data
    compute K+n_offs-1-vector filter such that for each index j
    if for any offset 0 <= o < n_offs and index 0 <= i < K such that
    o + i = j, the inequality B_i >= data_j holds then filter_j >= data_j

    therefore the number of data points below filter is an upper bound for
    the maximum number of points below bound in any K-window in data
    """
    pad_l, pad_r = np.min(bound[:n_offs-1]), np.min(bound[1-n_offs:])
    padded = np.r_[pad_l+np.zeros(n_offs-1,), bound, pad_r+np.zeros(n_offs-1,)]
    indices, switches = sliding_argmax(padded, n_offs)
    return padded[indices].repeat(np.diff(np.r_[[0], switches]))


def compute_all_pre_screening_filters(upper, lower, min_chnk=5, dyads=6):
    """compute upper and lower pre-screening filters for data blocks of
    sizes K+n_offs-1 where
    n_offs = min_chnk, 2min_chnk, ..., 2^(dyads-1)min_chnk

    the result is stored in global variables preA and preB
    """
    global CHUNK_SIZES

    CHUNK_SIZES = min_chnk * 2**np.arange(dyads)
    preA[1] = upper
    preB[1] = lower
    for n in CHUNK_SIZES:
        preA[n] = compute_pre_screening_filter(upper, n)
        preB[n] = -compute_pre_screening_filter(-lower, n)


def test_bounds(block, counts, threshold=400):
    """test whether the windows fitting in the data block 'block' fall
    within the bounds using pre-screening for efficient bulk rejection

    array 'counts' will be overwritten with the counts of compliant samples
    note that accurate counts will only be returned for above threshold
    windows, because the analysis of bulk rejected windows is short-circuited

    also note that bulk rejection only works for 'well behaved' data and
    for example not on random numbers
    """
    N = len(counts)
    K = len(preA[1])
    r = N % CHUNK_SIZES[0]
    # chop up N into as large as possible chunks with matching pre computed
    # filters
    # start with small and work upwards
    counts[:r] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
                                   (block[l:l+K] >= preB[1]))
                  for l in range(r)]

    def bisect(block, counts):
        M = len(counts)
        cnts = np.count_nonzero((block <= preA[M]) & (block >= preB[M]))
        if cnts < threshold:
            counts[:] = cnts
            return
        elif M == CHUNK_SIZES[0]:
            counts[:] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
                                          (block[l:l+K] >= preB[1]))
                         for l in range(M)]
        else:
            M //= 2
            bisect(block[:-M], counts[:M])
            bisect(block[M:], counts[M:])

    N = N // CHUNK_SIZES[0]
    for M in CHUNK_SIZES:
        if N % 2:
            bisect(block[r:r+M+K-1], counts[r:r+M])
            r += M
        elif N == 0:
            return
        N //= 2
    else:
        for j in range(2*N):
            bisect(block[r:r+M+K-1], counts[r:r+M])
            r += M


def analyse(data, use_pre_screening=True, min_chnk=5, dyads=6,
            threshold=400):
    samples, upper, lower = data
    N, K = samples.shape[0], upper.shape[0]
    times = [time.time()]
    if use_pre_screening:
        compute_all_pre_screening_filters(upper, lower, min_chnk, dyads)
    times.append(time.time())
    # compute switching points of max and min for running normalisation
    upper_inds, upper_swp = sliding_argmax(samples[:, 1], K)
    lower_inds, lower_swp = sliding_argmax(-samples[:, 2], K)
    times.append(time.time())
    # sum columns
    ABCD = samples.sum(axis=-1)
    times.append(time.time())
    counts = np.empty((N-K+1,), dtype=int)
    # main loop
    # loop variables:
    offs = 0
    u_ind, u_scale, u_swp = 0, samples[upper_inds[0], 1], upper_swp[0]
    l_ind, l_scale, l_swp = 0, samples[lower_inds[0], 2], lower_swp[0]
    while True:
        # check which is switching next, min(C) or max(B)
        if u_swp > l_swp:
            # greedily take the largest block possible such that dif and Cmin
            # do not change
            block = (ABCD[offs:l_swp+K-1] - 4*l_scale) \
                    * (0.25 / (u_scale-l_scale))
            if use_pre_screening:
                test_bounds(block, counts[offs:l_swp], threshold=threshold)
            else:
                counts[offs:l_swp] = [
                    np.count_nonzero((block[l:l+K] <= upper) &
                                     (block[l:l+K] >= lower))
                    for l in range(l_swp - offs)]
            # book keeping
            l_ind += 1
            offs = l_swp
            l_swp = lower_swp[l_ind]
            l_scale = samples[lower_inds[l_ind], 2]
        else:
            block = (ABCD[offs:u_swp+K-1] - 4*l_scale) \
                    * (0.25 / (u_scale-l_scale))
            if use_pre_screening:
                test_bounds(block, counts[offs:u_swp], threshold=threshold)
            else:
                counts[offs:u_swp] = [
                    np.count_nonzero((block[l:l+K] <= upper) &
                                     (block[l:l+K] >= lower))
                    for l in range(u_swp - offs)]
            u_ind += 1
            if u_ind == len(upper_inds):
                assert u_swp == N-K+1
                break
            offs = u_swp
            u_swp = upper_swp[u_ind]
            u_scale = samples[upper_inds[u_ind], 1]
    times.append(time.time())
    return {'counts': counts, 'valid': np.where(counts >= 400)[0],
            'timings': np.diff(times)}


def master(mode='calibrate', data='fake', use_pre_screening=True, nrep=3,
           min_chnk=None, dyads=None):
    t = time.time()
    if data in ('fake', 'load'):
        data1 = np.loadtxt('data1.csv', delimiter=';', skiprows=1,
                           usecols=[1,2,3,4])
        data2a = np.loadtxt('data2a.csv', delimiter=';', skiprows=1,
                            usecols=[1])
        data2b = np.loadtxt('data2b.csv', delimiter=';', skiprows=1,
                            usecols=[1])
        if data == 'fake':
            data1 = np.tile(data1, (10, 1))
        threshold = 400
    elif data == 'random':
        data1 = np.random.random((102000, 4))
        data2b = np.random.random(2000)
        data2a = np.random.random(2000)
        threshold = 490
        if use_pre_screening or mode == 'calibrate':
            print('WARNING: pre-screening not efficient on artificial data')
    else:
        raise ValueError("data mode {} not recognised".format(data))
    data = data1, data2a, data2b
    t_load = time.time() - t
    if mode == 'calibrate':
        min_chnk = (2, 3, 4, 5, 6) if min_chnk is None else min_chnk
        dyads = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) if dyads is None else dyads
        timings = np.zeros((len(min_chnk), len(dyads)))
        print('max bisect  ' + ' '.join([
            '   n.a.' if dy == 0 else '{:7d}'.format(dy) for dy in dyads]),
              end='')
        for i, mc in enumerate(min_chnk):
            print('\nmin chunk {}'.format(mc), end=' ')
            for j, dy in enumerate(dyads):
                for k in range(nrep):
                    if dy == 0: # no pre-screening
                        timings[i, j] += analyse(
                            data, False, mc, dy, threshold)['timings'][3]
                    else:
                        timings[i, j] += analyse(
                            data, True, mc, dy, threshold)['timings'][3]
                timings[i, j] /= nrep
                print('{:7.3f}'.format(timings[i, j]), end=' ', flush=True)
        best_mc, best_dy = np.unravel_index(np.argmin(timings.ravel()),
                                            timings.shape)
        print('\nbest', min_chnk[best_mc], dyads[best_dy])
        return timings, min_chnk[best_mc], dyads[best_dy]
    if mode == 'run':
        min_chnk = 2 if min_chnk is None else min_chnk
        dyads = 5 if dyads is None else dyads
        res = analyse(data, use_pre_screening, min_chnk, dyads, threshold)
        times = np.r_[[t_load], res['timings']]
        print(list(zip(('load', 'precomp', 'max/min', 'ABCD', 'main'), times)))
        print('TOTAL', times.sum())
        return res

1
你可以直接使用res_indices在输出上,out[res_indices]会给出满足每个偏移量上标准的点数,其中这个数字为400或更多。你能否在更多的数据上测试脚本?我已经对你发布的样本进行了调整,但我很想知道它是否也适用于其他样本。 - Paul Panzer
我注意到一件事情,那就是当我显著缩小窗口尺寸时,速度会变慢,考虑到你在这里使用的方法,这可能是正常的。 - RaduS
你是指data2a/b的长度吗?它真的更慢还是只是不那么快?窗口有多小?无论如何,我对代码进行了改进。函数和变量名字选择得更加慎重,并添加了一些文档字符串。ALL_INMAX_BLUR已被替换为use_pre_screeningmin_chnkdyads,它们用于打开或关闭批量拒绝代码,并控制预筛选偏移块的大小。现在有一个master函数,支持calibrate模式,可以帮助您找到最佳参数,例如如果您想改变窗口大小。 - Paul Panzer
1
思考一下,这似乎是有道理的,因为其中一个主要的节省之处就是利用滑动最大值不会经常改变的事实。现在,你把窗口变得越小,这个事实就越不成立,所以虽然你的节省消失了,但你仍然需要承担所有那些复杂代码的开销。如果你要使用非常小的窗口,其他策略可能会更好... - Paul Panzer
1
我忍不住又进行了一些调整。新代码修复了两个小错误,并添加了一个新的 sliding_argmax,在我们标准的100万个样本测试中,我的设备上又节省了半秒钟。所以我们现在只需要2.5秒,其中0.8秒用于加载数据! - Paul Panzer
显示剩余6条评论

3

这个问题在技术上是不相关的(不是GPU),但我相信你会感兴趣。

有一个显而易见且相当大的节省:

预计算A + B + C + D(不在循环中,在整个数据上:data1.sum(axis=-1)),因为abcd = ((A+B+C+D) - 4Cmin) / (4dif)。这将节省相当多的操作。

令人惊讶的是,之前没有人注意到这一点;-)

编辑:

还有一件事,尽管我怀疑这只是你的例子,而不是你的真实数据:

目前大约有一半的data2adata2b小。在这些地方,您对abcd的条件不能同时成立,因此您根本不需要计算abcd

编辑:

我下面使用了另一个小技巧,但忘了提到:如果您在移动窗口上计算最大值(或最小值)。当您向右移动一个点时,例如,最大值有多大可能会改变?只有两件事可以改变它:右侧的新点更大(大约每次移动窗口长度发生一次,即使发生,您也会立即知道新的最大值),或者旧的最大值从左侧的窗口中掉落(也大约每次移动窗口长度发生一次)。仅在这种情况下,您必须搜索整个窗口以找到新的最大值。

编辑:

我无法抵制在tensorflow中尝试它。我没有GPU,所以你必须自己测试它的速度。将标记行中的“gpu”更改为“cpu”。

在CPU上,它大约比您的原始实现(即没有Divakar的调整)慢一半。请注意,我已经有权更改了输入矩阵为普通数组。目前,tensorflow有点像一个移动目标,因此请确保您拥有正确的版本。我使用Python3.6和tf 0.12.1如果您今天进行pip3安装tensorflow-gpu,则应该可能工作。

import numpy as np
import time
import tensorflow as tf

# currently the max/min code is sequential
# thus
parallel_iterations = 1
# but you can put this in a separate loop, precompute and then try and run
# the remainder of doTheMathTF with a larger parallel_iterations

# tensorflow is quite capricious about its data types
ddf = tf.float64
ddi = tf.int32

def worker(data1, data2a, data2b):
    ###################################
    # CHANGE cpu to gpu in next line! #
    ###################################
    with tf.device('/cpu:0'):
        g = tf.Graph ()
        with g.as_default():
            ABCD = tf.constant(data1.sum(axis=-1), dtype=ddf)
            B = tf.constant(data1[:, 1], dtype=ddf)
            C = tf.constant(data1[:, 2], dtype=ddf)
            window = tf.constant(len(data2a))
            N = tf.constant(data1.shape[0] - len(data2a) + 1, dtype=ddi)
            data2a = tf.constant(data2a, dtype=ddf)
            data2b = tf.constant(data2b, dtype=ddf)
            def doTheMathTF(i, Bmax, Bmaxind, Cmin, Cminind, out):
                # most of the time we can keep the old max/min
                Bmaxind = tf.cond(Bmaxind<i,
                                  lambda: i + tf.to_int32(
                                      tf.argmax(B[i:i+window], axis=0)),
                                  lambda: tf.cond(Bmax>B[i+window-1], 
                                                  lambda: Bmaxind, 
                                                  lambda: i+window-1))
                Cminind = tf.cond(Cminind<i,
                                  lambda: i + tf.to_int32(
                                      tf.argmin(C[i:i+window], axis=0)),
                                  lambda: tf.cond(Cmin<C[i+window-1],
                                                  lambda: Cminind,
                                                  lambda: i+window-1))
                Bmax = B[Bmaxind]
                Cmin = C[Cminind]
                abcd = (ABCD[i:i+window] - 4 * Cmin) * (1 / (4 * (Bmax-Cmin)))
                out = out.write(i, tf.to_int32(
                    tf.count_nonzero(tf.logical_and(abcd <= data2a,
                                                    abcd >= data2b))))
                return i + 1, Bmax, Bmaxind, Cmin, Cminind, out
            with tf.Session(graph=g) as sess:
                i, Bmaxind, Bmax, Cminind, Cmin, out = tf.while_loop(
                    lambda i, _1, _2, _3, _4, _5: i<N, doTheMathTF,
                    (tf.Variable(0, dtype=ddi), tf.Variable(0.0, dtype=ddf),
                     tf.Variable(-1, dtype=ddi),
                     tf.Variable(0.0, dtype=ddf), tf.Variable(-1, dtype=ddi),
                     tf.TensorArray(ddi, size=N)),
                    shape_invariants=None,
                    parallel_iterations=parallel_iterations,
                    back_prop=False)
                out = out.pack()
                sess.run(tf.initialize_all_variables())
                out, = sess.run((out,))
    return out

#Declare variables
batchSize = 2000
sampleSize = 50000#00
resultArray = []

#Create Sample Data
data1 = np.random.uniform(1, 100, (sampleSize + batchSize, 4))
data2a = np.random.uniform(0, 1, (batchSize,))
data2b = np.random.uniform(0, 1, (batchSize,))

t0 = time.time()
out = worker(data1, data2a, data2b)
print('Runtime (tensorflow):', time.time() - t0)


good_indices, = np.where(out >= 490)
res_tf = np.c_[good_indices, out[good_indices]]

def doTheMath(tmpData1, data2a, data2b):
    A = tmpData1[:, 0]
    B  = tmpData1[:,1]
    C   = tmpData1[:,2]
    D = tmpData1[:,3]
    Bmax = B.max()
    Cmin  = C.min()
    dif = (Bmax - Cmin)
    abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

#Loop through the data
t0 = time.time()
for rowNr in  range(sampleSize+1):
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
    result = doTheMath(tmp_df, data2a, data2b)
    if (result >= 490):
        resultArray.append([rowNr , result])
print('Runtime (original):', time.time() - t0)
print(np.alltrue(np.array(resultArray)==res_tf))

谢谢你的回答,保罗。我在两台不同的电脑上测试了代码,都安装了Windows、Python3.5和tf 0.12.1。但出于某种原因,tensorflow版本比原版慢,即使我启用GPU,它仍然比原版慢。以下是一些统计数据:Pc1未安装GPU:Runtime (tensorflow): 9.321721315383911 Runtime (original): 3.7472479343414307 True Pc2安装并启用了GPU:Runtime (tensorflow): 72.36511659622192 Runtime (original): 3.5680108070373535 True - RaduS
@RaduS,恐怕我在使用tensorflow方面也只是稍微尝试了一下。就我所知,分析和调试都非常困难。我们再等几天吧,或许会有一些tensorflow专家来解决这个问题。如果没有的话,我可以再看看。你可以尝试使用此处提供的方法来了解是什么导致代码运行缓慢。很抱歉目前我帮不上更多忙。 - Paul Panzer
感谢@PaulPanzer的尝试。顺便提一下,我在问题编辑中上传了一个样本数据,如果您想测试它。 - RaduS
在真实的样本数据中,使用最小限制为400而不是490 @PaulPanzer - RaduS
我非常喜欢你的第一次编辑,关于 abcd = ((A+B+C+D) - 4Cmin) / (4dif)。是的,数据2a的一半会比数据2b小,但这只是在样本数据中,而不是真实数据。在真实数据中,数据2a始终高于数据2b。这些是上限和下限。 - RaduS
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接