如何进行无重复抽样并每次重新加权(条件抽样)?

3
考虑一个具有权重的N行数据集。这是基本算法:
  1. 将权重归一化,使它们总和为1。
  2. 将权重备份到另一列中以记录样本概率
  3. 在给定样本概率的情况下,随机选择1行(不重复),并将其添加到样本数据集中
  4. 从原始数据集中删除所选权重,并通过将剩余行的权重归一化来重新计算样本概率
  5. 重复步骤3和4,直到样本中的权重总和达到或超过阈值(假设为0.6)
以下是一个玩具示例:
import pandas as pd
import numpy as np

def sampler(n):
    df = pd.DataFrame(np.random.rand(n), columns=['weight'])
    df['weight'] = df['weight']/df['weight'].sum()
    df['samp_prob'] = df['weight']

    samps = pd.DataFrame(columns=['weight'])

    while True:
        choice = np.random.choice(df.index, 1, replace=False, p=df['samp_prob'])[0]
        samps.loc[choice, 'weight'] = df.loc[choice, 'weight']
        df.drop(choice, axis=0, inplace=True)
        df['samp_prob'] = df['weight']/df['weight'].sum()
        if samps['weight'].sum() >= 0.6:
            break
    return samps

玩具示例的问题在于随着 n 的增加,运行时间呈指数级增长:

Exponential Growth

2个回答

2

起始方法

以下是一些观察结果:

  • 每次迭代删除行以创建新数据框并不能提高性能。
  • 似乎不容易向量化,但应该很容易使用底层数组数据来提高性能。想法是使用掩码避免重新创建数据框或数组。起初,我们将使用两列数组,对应于名为:'weights''samp_prob'的列。

因此,考虑到这些问题,起始方法应该是这样的 -

def sampler2(n):
    a = np.random.rand(n,2)
    a[:,0] /= a[:,0].sum()
    a[:,1] = a[:,0]
    N = len(a)

    idx = np.arange(N)
    mask = np.ones(N,dtype=bool)
    while True:
        choice = np.random.choice(idx[mask], 1, replace=False, p=a[mask,1])[0]
        mask[choice] = 0
        a_masked = a[mask,0]
        a[mask,1] = a_masked/a_masked.sum()
        if a[~mask,0].sum() >= 0.6:
            break
    out = a[~mask,0]
    return out

改进 #1

后来的观察发现,数组的第一列在迭代过程中没有变化。因此,我们可以针对第一列的掩码求和进行优化,通过预先计算总和,在每次迭代中,a[~mask,0].sum() 就只需用总和减去 a_masked.sum() 即可。下面是第一个改进 -

def sampler3(n):
    a = np.random.rand(n,2)
    a[:,0] /= a[:,0].sum()
    a[:,1] = a[:,0]
    N = len(a)

    idx = np.arange(N)
    mask = np.ones(N,dtype=bool)
    a0_sum = a[:,0].sum()
    while True:
        choice = np.random.choice(idx[mask], 1, replace=False, p=a[mask,1])[0]
        mask[choice] = 0
        a_masked = a[mask,0]
        a_masked_sum = a_masked.sum()
        a[mask,1] = a_masked/a_masked_sum
        if a0_sum - a_masked_sum >= 0.6:
            break
    out = a[~mask,0]
    return out

改进 #2

现在,如果第一列在迭代过程中没有发生变化,使用两个单独的数组来切片和掩码二维数组的列可能会更好。这样就得到了一个修改后的版本,如下所示 -

def sampler4(n):
    a = np.random.rand(n)
    a /= a.sum()
    b = a.copy()
    N = len(a)

    idx = np.arange(N)
    mask = np.ones(N,dtype=bool)
    a_sum = a.sum()
    while True:
        choice = np.random.choice(idx[mask], 1, replace=False, p=b[mask])[0]
        mask[choice] = 0
        a_masked = a[mask]
        a_masked_sum = a_masked.sum()
        b[mask] = a_masked/a_masked_sum
        if a_sum - a_masked_sum >= 0.6:
            break
    out = a[~mask]
    return out

运行时测试 -

In [250]: n = 1000

In [251]: %timeit sampler(n) # original app
     ...: %timeit sampler2(n)
     ...: %timeit sampler3(n)
     ...: %timeit sampler4(n)
1 loop, best of 3: 655 ms per loop
10 loops, best of 3: 50 ms per loop
10 loops, best of 3: 44.9 ms per loop
10 loops, best of 3: 38.4 ms per loop

In [252]: n = 2000

In [253]: %timeit sampler(n) # original app
     ...: %timeit sampler2(n)
     ...: %timeit sampler3(n)
     ...: %timeit sampler4(n)
1 loop, best of 3: 1.32 s per loop
10 loops, best of 3: 134 ms per loop
10 loops, best of 3: 119 ms per loop
10 loops, best of 3: 100 ms per loop

因此,我们在n=1000n=2000的情况下,使用最终版本相比原始方法获得了17x+13x+的加速效果!


太棒了!我有强烈的感觉你会对这个问题有一个很好的答案。在我休息之前,我也在优化我的代码。我的代码演变成使用掩码和组合总和减去掩码总和,但我仍然在使用DataFrame(因为真实数据集有大约150列,并且在DataFrame中)。我得到了1000行的运行时间约为250毫秒。 - Kartik
@Kartik 原始方法的 2/3?数据框架被认为是缓慢的,因此使用数组数据和掩码对我来说是有意义的。 - Divakar
抱歉,那是错误的。我编辑了原来的评论。在原始值和sampler2之间,我得到了大约2/3。仅使用掩码和预计算总和就提供了巨大的加速。在DataFrame中删除行是最耗费时间的。 - Kartik
@Kartik 我明白了。sampler4 呢?在我这边看起来好像是最高效的。我想,如果你能像最初在问题中发布的那样,用一个漂亮的图形将 sample1-4 在同一图中展示出来,那会是一个很好的视角。 - Divakar
sampler4具有最佳性能。我将编辑我的问题,包括所有版本的sampler的图表。通常,对于这样的算法,我会编写一个MWE,并在此处发布问题,如果我认为我需要一些额外的帮助,并且如果该问题会引起一些成员的兴趣。然后,我继续优化我的MWE,直到我对性能感到满意或者感到疲倦。使用掩码是显而易见的(它消除了第二个“DF”,并消除了“loc”扩展和“drop”),所以我这样做了,并在晚上称之为结束。而在早上,你的代码跑得最快! - Kartik

0

我认为你可以重写这个while循环,以便在单次遍历中完成:

while True:
    choice = np.random.choice(df.index, 1, replace=False, p=df['samp_prob'])[0]
    samps.loc[choice, 'weight'] = df.loc[choice, 'weight']
    df.drop(choice, axis=0, inplace=True)
    df['samp_prob'] = df['weight']/df['weight'].sum()
    if samps['weight'].sum() >= 0.6:
        break

返回的翻译文本仅为以下类似内容:
n = len(df.index)
ind = np.random.choice(n, n, replace=False, p=df["samp_prob"])
res = df.iloc[ind]
i = (res.cumsum() >= 0.6).idxmax()  # first index that satisfies .sum() >= 0.6
samps = res.iloc[:i+1]

关键部分在于choice可以选择多个元素(甚至整个数组),同时仍然尊重概率。cumsum允许您在通过0.6阈值后截断。


在这个例子中,你可以看到数组是随机选择的,但是4很可能被选择在靠近顶部的位置。
In [11]: np.random.choice(5, 5, replace=False, p=[0.05, 0.05, 0.1, 0.2, 0.6])
Out[11]: array([0, 4, 3, 2, 1])

In [12]: np.random.choice(5, 5, replace=False, p=[0.05, 0.05, 0.1, 0.2, 0.6])
Out[12]: array([3, 4, 1, 2, 0])

In [13]: np.random.choice(5, 5, replace=False, p=[0.05, 0.05, 0.1, 0.2, 0.6])
Out[13]: array([0, 4, 3, 1, 2])

In [14]: np.random.choice(5, 5, replace=False, p=[0.05, 0.05, 0.1, 0.2, 0.6])
Out[14]: array([4, 3, 0, 2, 1])

In [15]: np.random.choice(5, 5, replace=False, p=[0.05, 0.05, 0.1, 0.2, 0.6])
Out[15]: array([4, 2, 3, 0, 1])

In [16]: np.random.choice(5, 5, replace=False, p=[0.05, 0.05, 0.1, 0.2, 0.6])
Out[16]: array([3, 4, 2, 0, 1])

注意:replace=False 保证了概率被“重新加权”,因为它不会再次被选中。

你能指出那个证明numpy.random.choice像问题中的算法一样重新加权的来源吗?有两种方法可以实现这一点,一种是仅丢弃已选择的元素和相关概率,并继续处理剩余部分而不进行重新归一化。另一种方法是进行重新归一化,这是使用非参数条件概率进行重采样的正确方式。我可以证明,如果不进行重新归一化,结果将会有偏差(具有较低初始概率的项将会受到歧视)。你可以通过对你回答中的示例进行重新归一化来观察到这一点。 - Kartik
澄清一下,应该很少选择概率较低的元素。但重新归一化确保概率增加。在一个有3个元素的例子中,概率为[0.6,0.3,0.1],如果选择第一个元素,则新的概率为[0.75,0.25],这会改变与[0.3,0.1]相比的选择几率。 - Kartik
根据源代码(https://github.com/numpy/numpy/blob/master/numpy/random/mtrand/mtrand.pyx),只有在无法找到足够的唯一索引以满足传递的大小时,`numpy.random.choice` 才会重新加权。这对于真正的条件抽样是不正确的,因此我不能接受你的答案。 - Kartik

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接