Cython:使prange并行化线程安全

4
这里是Cython入门指南。我正在尝试通过使用多线程来加速计算某个特定成对统计量(在几个bin中)。具体而言,我正在使用cython.parallel中的prange,它内部使用openMP。
以下是最小化示例,用Jupyter笔记本的Cython魔法进行编译。
笔记本设置:
%load_ext Cython
import numpy as np

Cython 代码:

%%cython --compile-args=-fopenmp --link-args=-fopenmp -a

from cython cimport boundscheck
import numpy as np
from cython.parallel cimport prange, parallel

@boundscheck(False)
def my_parallel_statistic(double[:] X, double[:,::1] bins, int num_threads):

    cdef: 
        int N = X.shape[0]
        int nbins = bins.shape[0]
        double Xij,Yij
        double[:] Z = np.zeros(nbins,dtype=np.float64)
        int i,j,b

    with nogil, parallel(num_threads=num_threads):
        for i in prange(N,schedule='static',chunksize=1):
            for j in range(i):
                #some pairwise quantities
                Xij = X[i]-X[j]
                Yij = 0.5*(X[i]+X[j])
                #check if in bin
                for b in range(nbins):
                    if (Xij < bins[b,0]) or (Xij > bins[b,1]):
                        continue
                    Z[b] += Xij*Yij

    return np.asarray(Z)

模拟数据和桶

X = np.random.rand(10000)
bin_edges = np.linspace(0.,1,11)
bins = np.array([bin_edges[:-1],bin_edges[1:]]).T
bins = bins.copy(order='C')

计时通过

%timeit my_parallel_statistic(X,bins,1)
%timeit my_parallel_statistic(X,bins,4)

产量
1 loop, best of 3: 728 ms per loop
1 loop, best of 3: 330 ms per loop

这不是一个完美的比例缩放,但这不是问题的重点。(如果您有超出添加常规修饰符或微调prange参数的建议,请让我知道。)
然而,这个计算显然不是线程安全的。
Z1 = my_parallel_statistic(X,bins,1)
Z4 = my_parallel_statistic(X,bins,4)
np.allclose(Z1,Z4)

这段文字揭示了两个结果之间的显著差异(在此示例中高达20%)。

我强烈怀疑问题在于多个线程可以执行。

Z[b] += Xij*Yij

同时,我不知道如何在不牺牲加速的情况下解决这个问题。

在我的实际使用案例中,计算Xij和Yij更加昂贵,因此我希望只对每对进行一次计算。另外,预先计算并存储所有对的Xij和Yij,然后简单地通过箱循环也不是一个好的选择,因为N可能会变得非常大,我无法在内存中存储100,000 x 100,000的numpy数组(实际上这是将其重写为Cython的主要动机!)。

系统信息(按照评论建议添加):

CPU(s): 8
Model name: Intel(R) Core(TM) i7-4790K CPU @ 4.00GHz
OS: Red Hat Linux v6.8
Memory: 16 GB

每个线程中的操作是否真正独立于任何其他操作?哪一个先运行有关系吗?如果存在任何依赖,这不是并行操作的好选择。 - hpaulj
只要每个线程都创建自己的Xij和Yij,它们就应该是独立的(但也许这就是问题所在?)就数学而言,对于每对(i,j),Xij和Yij是独立计算的,因此对统计量Z的贡献也是独立的。 - user4319496
1
感谢您在问题中包含了如此出色的 [mcve]!这样一个经过深入研究和制定的问题在 SO 上非常罕见。唯一可能需要包含的是您的 CPU 型号和内存以评论性能,但这并不是问题的主要点。 - Zulan
好的,已经添加了。 - user4319496
2个回答

5

是的,Z[b] += Xij*Yij确实存在竞态条件。

有几种方法可以使其成为原子操作临界区。除了Cython的实现问题外,由于共享的Z向量会出现伪共享,因此在任何情况下都会导致性能不佳。

因此,更好的选择是为每个线程保留一个私有数组。又有几种(非)选项。可以使用私有的malloc指针,但我想坚持使用np。内存切片无法分配为私有变量。一个二维的(num_threads, nbins)数组也可以工作,但由于某种原因会生成非常复杂而低效的数组索引代码。这样做虽然可行,但速度较慢且不易扩展。

使用手动“2D”索引的平坦numpy数组效果很好。通过避免将数组的私有部分填充到64字节(这是典型的缓存行大小),您可以获得一些额外的性能。这避免了核之间的伪共享。私有部分只是在并行区域外逐个累加。

%%cython --compile-args=-fopenmp --link-args=-fopenmp -a
from cython cimport boundscheck
import numpy as np
from cython.parallel cimport prange, parallel
cimport openmp

@boundscheck(False)
def my_parallel_statistic(double[:] X, double[:,::1] bins, int num_threads):

    cdef: 
        int N = X.shape[0]
        int nbins = bins.shape[0]
        double Xij,Yij
        # pad local data to 64 byte avoid false sharing of cache-lines
        int nbins_padded = (((nbins - 1) // 8) + 1) * 8
        double[:] Z_local = np.zeros(nbins_padded * num_threads,dtype=np.float64)
        double[:] Z = np.zeros(nbins)
        int i,j,b, bb, tid

    with nogil, parallel(num_threads=num_threads):
        tid = openmp.omp_get_thread_num()
        for i in prange(N,schedule='static',chunksize=1):
            for j in range(i):
                #some pairwise quantities
                Xij = X[i]-X[j]
                Yij = 0.5*(X[i]+X[j])
                #check if in bin
                for b in range(nbins):
                    if (Xij < bins[b,0]) or (Xij > bins[b,1]):
                        continue
                    Z_local[tid * nbins_padded + b] += Xij*Yij
    for tid in range(num_threads):
        for bb in range(nbins):
            Z[bb] += Z_local[tid * nbins_padded + bb]


    return np.asarray(Z)

这在我的四核机器上表现得非常好,速度为720毫秒/191毫秒,加速比为3.6。剩下的差距可能是由于超频模式造成的。我现在没有合适的测试机器。


感谢您提供了出色的答案,不仅给出了修复版本,还提供了背景信息以便理解!PS:我在您的代码中修复了一个小错误:在结尾的串行循环中,索引应该是bb而不是b(修复等待审核/批准)。 - user4319496

1
您说得没错,对Z的访问存在竞争条件。
您最好定义num_threads个Z的副本,如cdef double[:] Z = np.zeros((num_threads, nbins), dtype=np.float64),并在prange循环后沿着axis 0执行求和操作。
return np.sum(Z, axis=0)

Cython代码可以在并行区域中使用with gil语句,但仅用于错误处理。您可以查看一般的C代码,以查看是否会触发原子OpenMP操作,但我怀疑这一点。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接