加速正态分布概率质量分配

5
我们有N个用户,每个用户的平均得分为P,其中每个得分都是0到1之间的单个值。由于这些得分存在一定的不确定性,我们需要使用已知密度为0.05的正态分布来分配每个点的质量。此外,我们需要将质量包裹在0和1周围,例如,一个位于0.95的点也会在0周围分配质量。我在下面提供了一个工作示例,它将正态分布分成D=50个bin。该示例使用Python的typing模块,但如果您愿意,可以忽略它。
from typing import List, Any
import numpy as np
import scipy.stats
import matplotlib.pyplot as plt

D = 50
BINS: List[float] = np.linspace(0, 1, D + 1).tolist()


def probability_mass(distribution: Any, x0: float, x1: float) -> float:
    """
    Computes the area under the distribution, wrapping at 1.
    The wrapping is done by adding the PDF at +- 1.
    """
    assert x1 > x0
    return (
        (distribution.cdf(x1) - distribution.cdf(x0))
        + (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
        + (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
    )


def point_density(x: float) -> List[float]:
    distribution: Any = scipy.stats.norm(loc=x, scale=0.05)
    density: List[float] = []
    for i in range(D):
        density.append(probability_mass(distribution, BINS[i], BINS[i + 1]))
    return density


def user_density(points: List[float]) -> Any:

    # Find the density of each point
    density: Any = np.array([point_density(p) for p in points])

    # Combine points and normalize
    combined = density.sum(axis=0)
    return combined / combined.sum()


if __name__ == "__main__":

    # Example for one user
    data: List[float] = [.05, .3, .5, .5]
    density = user_density(data)

    # Example for multiple users (N = 2)
    print([user_density(x) for x in [[.3, .5], [.7, .7, .7, .9]]])

    ### NB: THE REMAINING CODE IS FOR ILLUSTRATION ONLY!
    ### NB: THE IMPORTANT THING IS TO COMPUTE THE DENSITY FAST!
    middle: List[float] = []
    for i in range(D):
        middle.append((BINS[i] + BINS[i + 1]) / 2)
    plt.bar(x=middle, height=density, width=1.0 / D + 0.001)
    plt.xlim(0, 1)
    plt.xlabel("x")
    plt.ylabel("Density")
    plt.show()

matplotlib output

在这个例子中,N=1D=50P=4。然而,我们希望将这种方法扩展到N=10000P=100,同时尽可能快地运行。我不清楚如何对这种方法进行向量化处理。我们该如何最好地加速这个过程? 编辑 更快的解决方案可以具有略微不同的结果。例如,它可以近似正态分布,而不是使用精确的正态分布。 编辑2 我们只关心使用user_density()函数计算density。绘图只是为了说明方法。我们不关心图形本身:) 编辑3 请注意,P是每个用户的平均点数。某些用户可能会有更多或更少的点数。如果有帮助的话,您可以假设我们可以丢弃所有用户最多具有2 * P个点。在基准测试时忽略此部分也可以,只要解决方案能够处理灵活的用户点数即可。

你的分布为什么在“1”左右有个峰值?由于你在“0.5”处有两个点,直方图不应该在“0.5”处翻倍吗? - Quang Hoang
什么意思?直方图在 0.5 处是双倍 - 它是 0.08 而其他的大约是 0.04 - pir
1处的质量是有意为之的。请参见我的问题中的“我们需要将质量包裹在0和1周围,以便例如0.95处的点也会分配质量到0周围”的内容 :) - pir
你说 P 是每个用户的平均点数。P 是否遵循特定的分布?或者 P 是一个固定长度为 N 的数组,但可以假设 P.mean() 约为 100? - tstanisl
好的,我已经添加了 :) - pir
显示剩余6条评论
3个回答

5
您可以使用FFT并以numpy友好的格式创建数据,在最大情况下(N=10000,AVG[P]=100,D=50),可以获得低于50ms的速度。否则,速度将接近300毫秒。
这个想法是将一个以0为中心的普通分布与一系列Dirac delta卷积。
请参见下面的图像: enter image description here 使用循环卷积解决了两个问题。
- 自然地处理边缘的包装 - 可以通过FFT和Convolution Theorem有效计算
首先必须创建要复制的分布。函数mk_bell()创建了一个标准差为0.05、以0为中心的正态分布的直方图。分布围绕1周包裹着。在这里可以使用任意分布。计算分布的频谱用于快速卷积。

接下来创建了一个类似梳子的函数。峰值被放置在与用户密度峰值对应的索引处。例如:

peaks_location = [0.1, 0.3, 0.7]
D = 10

映射到

peak_index = (D * peak_location).astype(int) = [1, 3, 7]
dist = [0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0] # ones at [1, 3, 7]

您可以使用np.bincount()函数计算每个峰值位置的箱子索引,快速创建Diract Delta组合。
为了进一步加快速度,可以并行计算用户峰值的组合函数。
数组dist是形状为NxD的2D数组。它可以线性化为形状为(N*D)的1D数组。此更改后,位置[user_id, peak_index]上的元素将从索引user_id*D + peak_index可访问。
使用numpy友好的输入格式(如下所述),此操作易于向量化。
卷积定理表明,两个信号的卷积的频谱等于每个信号的频谱的乘积。
频谱由numpy.fft.rfft计算,它是专门用于实只信号(没有虚部)的快速傅里叶变换的变体。
Numpy允许使用一个命令计算较大矩阵的每行FFT。
接下来,通过简单的乘法和广播计算卷积的频谱。
接下来,通过numpy.fft.irfft中实现的反傅里叶变换将频谱计算回“时间”域。

为了充分利用numpy的速度,应避免使用可变大小的数据结构,并保持固定大小的数组。我建议将输入数据表示为三个数组。

  • uids用户的标识符,整数0..N-1
  • peaks峰值的位置
  • mass峰值的质量,目前为1/每个用户的峰值数

这种数据表示方式可以快速进行矢量化处理。例如:

user_data = [[0.1, 0.3], [0.5]]

映射到:

uids = [0, 0, 1] # 2 points for user_data[0], one from user_data[1]
peaks = [0.1, 0.3, 0.5] # serialized user_data
mass = [0.5, 0.5, 1] # scaling factors for each peak, 0.5 means 2 peaks for user 0

代码:

import numpy as np
import matplotlib.pyplot as plt
import time

def mk_bell(D, SIGMA):
    # computes normal distribution wrapped and centered at zero
    x = np.linspace(0, 1, D, endpoint=False);
    x = (x + 0.5) % 1 - 0.5
    bell = np.exp(-0.5*np.square(x / SIGMA))
    return bell / bell.sum()

def user_densities_by_fft(uids, peaks, mass, D, N=None):
    bell = mk_bell(D, 0.05).astype('f4')
    sbell = np.fft.rfft(bell)
    if N is None:
        N = uids.max() + 1
    # ensure that peaks are in [0..1) internal
    peaks = peaks - np.floor(peaks)
    # convert peak location from 0-1 to the indices
    pidx = (D * (peaks + uids)).astype('i4')
    dist = np.bincount(pidx, mass, N * D).reshape(N, D)
    # process all users at once with Convolution Theorem
    sdist = np.fft.rfft(dist)
    sdist *= sbell
    res = np.fft.irfft(sdist)

    return res

def generate_data(N, Pmean):
    # generateor for large data
    data = []
    for n in range(N):
        # select P uniformly from 1..2*Pmean
        P = np.random.randint(2 * Pmean) + 1
        # select peak locations
        chunk = np.random.uniform(size=P)
        data.append(chunk.tolist())
    return data

def make_data_numpy_friendly(data):
    uids = []
    chunks = []
    mass = []
    for uid, peaks in enumerate(data):
        uids.append(np.full(len(peaks), uid))
        mass.append(np.full(len(peaks), 1 / len(peaks)))
        chunks.append(peaks)
    return np.hstack(uids), np.hstack(chunks), np.hstack(mass)




D = 50

# demo for simple multi-distribution
data, N = [[0, .5], [.7, .7, .7, .9], [0.05, 0.3, 0.5, 0.5]], None
uids, peaks, mass = make_data_numpy_friendly(data)
dist = user_densities_by_fft(uids, peaks, mass, D, N)
plt.plot(dist.T)
plt.show()

# the actual measurement
N = 10000
P = 100
data = generate_data(N, P)

tic = time.time()
uids, peaks, mass = make_data_numpy_friendly(data)
toc = time.time()
print(f"make_data_numpy_friendly: {toc - tic}")

tic = time.time()
dist = user_densities_by_fft(uids, peaks, mass, D, N)
toc = time.time()
print(f"user_densities_by_fft: {toc - tic}")


我的四核Haswell机器的结果是:
make_data_numpy_friendly: 0.2733159065246582
user_densities_by_fft: 0.04064297676086426

处理数据耗时40毫秒。请注意,将数据处理为numpy友好格式所需的时间比分布实际计算花费的时间多6倍。 当涉及到循环时,Python速度非常慢。 因此,我强烈建议一开始就直接以numpy友好的方式生成输入数据。
以下是需要修复的一些问题:
精确度可以通过使用更大的D和下采样来改善 通过扩大峰值区间可以进一步提高峰值位置的准确性。 性能方面,scipy.fft提供了更多的FFT实现变体,可能更快。

这看起来很棒。惊奇地发现数学可以被用来大大加速事情!你能更详细地解释一下 mk_bell() 中的代码吗? - pir
同样的,你能解释/链接这背后的逻辑吗?我听说过卷积定理,但我不能完全理解正在发生的事情。 - pir
你能解释一下为什么在 peaks + uids 中使用了 uids 吗? - pir
1
@pir 添加了一些解释。感谢您接受答案。 - tstanisl
谢谢!我很想听更多关于如何提高峰值位置准确性的内容。使用这种实现方式,[0.0] 的分布不会在 0 周围对称。有没有可能以一种简单的方式来提高准确性呢? - pir
显示剩余2条评论

4
这是我的向量化方法:
data = np.array([0.05, 0.3, 0.5, 0.5])

np.random.seed(31415)
# random noise
randoms = np.random.normal(0,1,(len(data), int(1e5))) * 0.05

# samples with noise
samples = data[:,None] + randoms

# wrap [0,1]
samples = (samples % 1).ravel()


# histogram
hist, bins, patches = plt.hist(samples, bins=BINS, density=True)

输出:

输入图像描述


谢谢。不过,在你的回答中,density是在哪里计算的?请看我编辑后的问题。 - pir
上面的代码中密度以 hist 形式返回。如果您不关心绘图,可以将 plt.hist 替换为 np.histogramhist, bins = np.histogram(samples, bins=BINS) - Quang Hoang
我明白了,谢谢!您认为我该如何将其扩展到N = 10000?我应该用for循环吗? - pir
你是说你想要重复执行那个操作10k次,每次都有100个数据点吗?我的第一印象是循环并不是一个坏主意。 - Quang Hoang
是的,我们有大约10k x 100的数据集,并将对这些数据集应用此方法。 - pir

4

我成功地将每个包含100个数据点的样本处理时间从约4秒缩短至每个样本约1毫秒。

看起来你在模拟大量正态分布时花费了相当多的时间。既然你已经处理了非常大的样本量,那么可以直接使用标准正态分布值,因为这些值最终会被平均化。

我重现了你的方法(BaseMethod类),然后创建了一个优化后的类(OptimizedMethod类),并使用timeit装饰器进行了评估。我方法的主要区别在于以下代码行:

    # Generate a standardized set of values to add to each sample to simulate normal distribution
    self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])

这将基于反正态累积分布函数创建一组通用的数据点,我们可以添加到每个数据点上以模拟该点周围的正态分布。然后,我们只需将数据重塑为用户样本,并在样本上运行np.histogram。

import numpy as np
import scipy.stats
from scipy.stats import norm
import time

# timeit decorator for evaluating performance
def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        print('%r  %2.2f ms' % (method.__name__, (te - ts) * 1000 ))
        return result
    return timed

# Define Variables
N = 10000
D = 50
P = 100

# Generate sample data
np.random.seed(0)
data = np.random.rand(N, P)

# Run OP's method for comparison
class BaseMethod:
    def __init__(self, d=50):
        self.d = d
        self.bins = np.linspace(0, 1, d + 1).tolist()

    def probability_mass(self, distribution, x0, x1):
        """
        Computes the area under the distribution, wrapping at 1.
        The wrapping is done by adding the PDF at +- 1.
        """
        assert x1 > x0
        return (
            (distribution.cdf(x1) - distribution.cdf(x0))
            + (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
            + (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
        )

    def point_density(self, x):
        distribution = scipy.stats.norm(loc=x, scale=0.05)
        density = []
        for i in range(self.d):
            density.append(self.probability_mass(distribution, self.bins[i], self.bins[i + 1]))
        return density

    @timeit
    def base_user_density(self, data):
        n = data.shape[0]
        density = np.empty((n, self.d))
        for i in range(data.shape[0]):
            # Find the density of each point
            row_density = np.array([self.point_density(p) for p in data[i]])
            # Combine points and normalize
            combined = row_density.sum(axis=0)
            density[i, :] = combined / combined.sum()
        return density


base = BaseMethod(d=D)
# Only running base method on first 2 rows of data because it's slow
density = base.base_user_density(data[:2])
print(density[:2, :5])


class OptimizedMethod:

    def __init__(self, d=50, norm_val_n=50):
        self.d = d
        self.norm_val_n = norm_val_n
        self.bins = np.linspace(0, 1, d + 1).tolist()

        # Generate a standardized set of values to add to each sample to simulate normal distribution
        self.norm_vals = np.array([norm.ppf(x / norm_val_n) * 0.05 for x in range(1, norm_val_n, 1)])

    @timeit
    def optimized_user_density(self, data):

        samples = np.empty((data.shape[0], data.shape[1], self.norm_val_n - 1))
        # transform datapoints to normal distributions around datapoint
        for i in range(self.norm_vals.shape[0]):
            samples[:, :, i] = data + self.norm_vals[i]
        samples = samples.reshape(samples.shape[0], -1)

        #wrap around [0, 1]
        samples = samples % 1

        #loop over samples for density
        density = np.empty((data.shape[0], self.d))
        for i in range(samples.shape[0]):
            hist, bins = np.histogram(samples[i], bins=self.bins)
            density[i, :] = hist / hist.sum()

        return density


om = OptimizedMethod()
#Run optimized method on first 2 rows for apples to apples comparison
density = om.optimized_user_density(data[:2])
#Run optimized method on full data
density = om.optimized_user_density(data)
print(density[:2, :5])

在我的系统上,原始方法在2行数据上运行大约需要8.4秒的时间,而优化后的方法在2行数据上运行只需要1毫秒,并且在4.7秒内完成了10000行数据的处理。我打印出每种方法前两个样本的前五个值。

'base_user_density'  8415.03 ms
[[0.02176227 0.02278653 0.02422535 0.02597123 0.02745976]
 [0.0175103  0.01638513 0.01524853 0.01432158 0.01391156]]
'optimized_user_density'  1.09 ms
'optimized_user_density'  4755.49 ms
[[0.02142857 0.02244898 0.02530612 0.02612245 0.0277551 ]
 [0.01673469 0.01653061 0.01510204 0.01428571 0.01326531]]

生成器可以在概率质量、点密度和基础用户密度中使用吗? - VanBantam
谢谢。这太容易理解了,真是太棒了! - pir

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接