如何高效地向量化超几何分布累积分布函数计算?

5

我有一组数据框,每个数据框有4列和1,000,000行。对于每一行,我想运行一个超几何检验,以这些列的4个值作为输入,并返回一个p值(使用超几何分布的累积概率密度函数)。

我尝试了两种基于SciPy的实现(如下),但都无法很好地扩展。是否有其他方法可以更有效地实现我所做的事情?我有一个在R中编写的工作解决方案(在底部),但不幸的是,代码必须用Python编写,因为它将用于从Postgres DB加载数据的Airflow任务,目前没有适用于R的Postgres hook。

较慢的SciPy实现

样本数据是这样创建的(使用10,000行而不是完整的52 * 1,000,000行):

import numpy as np
import pandas as pd
from scipy.stats import hypergeom
from timeit import default_timer as timer

n_rows = 10000
n_total = 1000
max_good = 400
max_sample = 200
s = 100

df = pd.DataFrame({
  'ngood': np.random.hypergeometric(ngood=max_good, nbad=n_total - max_good,
                                    nsample=s, size=n_rows),
  'nsamp': np.random.hypergeometric(ngood=max_sample, nbad=n_total - max_sample,
                                    nsample=s, size=n_rows)
})

df = df.assign(kgood=np.array([
    np.random.hypergeometric(ngood=ngood, nbad=n_total - ngood,
                             nsample=nsamp, size=1)
    for ngood, nsamp
    in zip(df.ngood, df.nsamp)
]))

基于for推导式的慢速实现:

start = timer()
res = [
    hypergeom.cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)
    for ngood_found, ngood, nsamp
    in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

[0.44247900002512713, 0.71587318053768023, 0.97215178135616498]
Elapsed time: 2.405838s

基于NumPy向量化的缓慢实现:

vectorized_test = np.vectorize(hypergeom.cdf, otypes=[np.float], excluded='M')
start = timer()
res = vectorized_test(k=df.kgood.values, M=n_total,
                      n=df.ngood.values, N=df.nsamp.values)
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

[ 0.442479    0.71587318  0.97215178]
Elapsed time: 2.518952s

快速的R实现

以上计算可以在毫秒内完成,这表明phyper在C级别上进行了向量化处理,而不是像我所知道的那样基本上是Python循环的numpy向量化。

library(microbenchmark)

n_rows <- 10000
n_total <- 1000
max_good <- 400
max_sample <- 200
s <- 100

df <- data.frame(
  ngood = rhyper(nn=n_rows, m=max_good, n=n_total - max_good, k=s),
  nsamp = rhyper(nn=n_rows, m=max_sample, n=n_total - max_sample, k=s)
)

df$kgood <- rhyper(nn=n_rows, m=df$ngood, n=n_total - df$ngood, k=df$nsamp)

microbenchmark(
  res <- phyper(q = df$k, m = df$ngood, n = n_total - df$ngood, k=df$nsamp)
)

Unit: milliseconds
                                                                 expr      
 phyper(q = df$k, m = df$ngood, n = n_total - df$ngood, k = df$nsamp) 

     min      lq     mean   median       uq      max neval
2.984852 3.00838 3.350509 3.134745 3.439138 5.462694   100
1个回答

2

可以通过缓存hypergeom.cdf的结果来获得小的改进,如下所示:

from functools import lru_cache

#@lru_cache(maxsize = 16*1024)
#def fn(k, n, N):
#    return hypergeom.cdf(k = k, M=n_total, n = n, N = N)

data = {}
def fn(k, n, N):
    key = (k, n, N)
    if not key in data:
        val = hypergeom.cdf(k = k, M=n_total, n = n, N = N)
        data[key] = val
    else:
        val = data[key]
    return val

start = timer()
res = [
    fn(ngood_found, ngood, nsamp)
    for ngood_found, ngood, nsamp
    in zip(df.kgood, df.ngood, df.nsamp)
]

end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

这是在我的机器上的结果:Elapsed time: 0.279891s(使用lru_cache0.315840s)。 编辑: 实际上,瓶颈似乎更多地在于超几何分布函数本身的计算(而不是for循环的开销)。为了测试这一点,我创建了一个SWIG文件_cdf.i,用于从GSL包中的函数gsl_cdf_hypergeometric_P
%module cdf
%{
#include "gsl/gsl_cdf.h"
%}
double gsl_cdf_hypergeometric_P(int, int, int, int);

然后,将该文件“转换”为一个包:

swig -c++ -python _cdf.i
g++ -fPIC -c _cdf_wrap.c -I${HOME}/venvs/p3/include/python3.5m
g++ -shared _cdf_wrap.o -o _cdf.so -lgsl

然后可以直接在原始示例中使用此代码:

import numpy as np
import pandas as pd
from scipy.stats import hypergeom
from timeit import default_timer as timer
from cdf import gsl_cdf_hypergeometric_P

n_rows = 10000
n_total = 1000
max_good = 400
max_sample = 200
s = 100

df = pd.DataFrame({
  'ngood': np.random.hypergeometric(ngood=max_good, nbad=n_total - max_good,
                                    nsample=s, size=n_rows),
  'nsamp': np.random.hypergeometric(ngood=max_sample, nbad=n_total - max_sample,
                                    nsample=s, size=n_rows)
})

df = df.assign(kgood=np.array([
    np.random.hypergeometric(ngood=ngood, nbad=n_total - ngood,
                             nsample=nsamp, size=1)
    for ngood, nsamp
    in zip(df.ngood, df.nsamp)
]))

start = timer()
res = [
    hypergeom.cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)
    for ngood_found, ngood, nsamp
    in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

def cdf(k, M, n, N):
    return gsl_cdf_hypergeometric_P(int(k), int(n), int(M-n), int(N))

start = timer()
res = [
    cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)
    for ngood_found, ngood, nsamp
    in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

这将产生:
[0.58605423287644209, 0.38055520197355552, 0.70597920363472055, 0.99728041338849138, 0.79797439957395955, 0.42245057292366844, 0.58627644982763727, 0.74819471224742817, 0.75121042470714849, 0.48561471798885397]
Elapsed time: 2.069916s
[0.5860542328771666, 0.38055520197384757, 0.7059792036350717, 0.997280413389543, 0.7979743995750694, 0.4224505729249291, 0.5862764498272103, 0.7481947122472634, 0.7512104247082603, 0.4856147179890127]
Elapsed time: 0.018253s

因此,即使使用普通的 for 循环,加速也非常显著。

真聪明!我在其中一个数据框上进行了测试,结果发现它只有109,096个独特的行,而总共有1,420,290行。 - Backlin
1
@Backlin 或许可以通过利用超几何分布的对称性进一步减少它... - ewcz
1
@Backlin 看起来循环并不是导致 Python 版本变慢的原因 - 我已经更新了答案... - ewcz
哇,这正是我在寻找的!以前没听说过 SWIG,但它看起来非常有用。 - Backlin
我知道这个答案相当古老(但也相当惊人!)- 这仍然是唯一的实现方式吗? 我不确定如何正确运行您上面列出的swig/g ++命令。 - DrTchocky

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接