从一个具有权重的列表中随机选择k个元素

79

在不考虑权重(等概率)情况下进行选择的方法可以在这里清晰地描述。

我想知道是否有一种方法可以将这种方法转换为加权方法。

我对其他方法也很感兴趣。

更新:抽样替换


3
采样是有替换还是无替换的? - Amro
2
无论如何,这是 https://dev59.com/kHRC5IYBdhLWcg3wRO3k 的重复。 - Jason Orendorff
1
@Jason,我在询问一种将那种优雅的方法转换为加权方法的方式,这并不完全是重复的。 - nimcap
这个问题会被问多少次? - Jérémie
4
按照元素被选中的概率成比例的权重进行无放回抽样并不是一件简单的任务,近期有很好的相关研究。例如请参见:http://books.google.com.br/books/about/Sampling_Algorithms.html?id=2auW1rVAwGMC&redir_esc=y - Ferdinand.kraft
显示剩余2条评论
14个回答

71

如果采样是有放回的,您可以使用以下算法(在Python中实现):

import random

items = [(10, "low"),
         (100, "mid"),
         (890, "large")]

def weighted_sample(items, n):
    total = float(sum(w for w, v in items))
    i = 0
    w, v = items[0]
    while n:
        x = total * (1 - random.random() ** (1.0 / n))
        total -= x
        while x > w:
            x -= w
            i += 1
            w, v = items[i]
        w -= x
        yield v
        n -= 1

这是O(n+m),其中m是项目数量。

为什么会这样运行?它基于以下算法:

def n_random_numbers_decreasing(v, n):
    """Like reversed(sorted(v * random() for i in range(n))),
    but faster because we avoid sorting."""
    while n:
        v *= random.random() ** (1.0 / n)
        yield v
        n -= 1

函数weighted_sample就是将这个算法与遍历items列表的步骤结合在了一起来挑选那些被随机数所选择的元素。
这样做的原因是当n个0..v的随机数都小于某个值z时,概率是P = (z/v)n。解出z,你就得到了z = vP1/n。用一个随机数替代P,就可以选出符合正确分布的最大数字;我们只需要重复这个过程,就可以选择出其他所有数字。 如果不进行替换抽样,则可以将所有元素放入一个二叉堆中,其中每个节点缓存该子堆中所有项目权重的总和。构建堆的时间复杂度为O(m)。从堆中选择一个随机项目,并尊重权重,时间复杂度为O(log m)。移除该项目并更新缓存的总和同样是O(log m)。因此,您可以在O(m + n log m)时间内选择n个项目。
(注:这里的“权重”指每次选择元素时,剩余的可能性都按其权重的比例进行选择。这并不意味着元素按其权重的概率出现在输出中。)
下面是丰富注释的实现代码:
import random

class Node:
    # Each node in the heap has a weight, value, and total weight.
    # The total weight, self.tw, is self.w plus the weight of any children.
    __slots__ = ['w', 'v', 'tw']
    def __init__(self, w, v, tw):
        self.w, self.v, self.tw = w, v, tw

def rws_heap(items):
    # h is the heap. It's like a binary tree that lives in an array.
    # It has a Node for each pair in `items`. h[1] is the root. Each
    # other Node h[i] has a parent at h[i>>1]. Each node has up to 2
    # children, h[i<<1] and h[(i<<1)+1].  To get this nice simple
    # arithmetic, we have to leave h[0] vacant.
    h = [None]                          # leave h[0] vacant
    for w, v in items:
        h.append(Node(w, v, w))
    for i in range(len(h) - 1, 1, -1):  # total up the tws
        h[i>>1].tw += h[i].tw           # add h[i]'s total to its parent
    return h

def rws_heap_pop(h):
    gas = h[1].tw * random.random()     # start with a random amount of gas

    i = 1                     # start driving at the root
    while gas >= h[i].w:      # while we have enough gas to get past node i:
        gas -= h[i].w         #   drive past node i
        i <<= 1               #   move to first child
        if gas >= h[i].tw:    #   if we have enough gas:
            gas -= h[i].tw    #     drive past first child and descendants
            i += 1            #     move to second child
    w = h[i].w                # out of gas! h[i] is the selected node.
    v = h[i].v

    h[i].w = 0                # make sure this node isn't chosen again
    while i:                  # fix up total weights
        h[i].tw -= w
        i >>= 1
    return v

def random_weighted_sample_no_replacement(items, n):
    heap = rws_heap(items)              # just make a heap...
    for i in range(n):
        yield rws_heap_pop(heap)        # and pop n items off it.

1
“修复”策略可能是最快的。您可以通过在每个“节点”中存储原始值而不是单独的字典来加快速度。 - Jason Orendorff
1
JavaScript端口(如果有需要):https://gist.github.com/seejohnrun/5291246 - John
如果您能解释一下为什么不替换算法会让您感到困扰,而替换算法则不会,我会更好地理解您的担忧。两种算法都使用相同的“权重”解释(即它是在任何给定选择中被选中的概率的度量,而不是在n次选择后总体被选中的概率)。 - Jason Orendorff
你的替换算法看起来很好,因为“weight”意味着“被选择的相对概率”,我认为这就是人们在这种情况下所说的“weight”的含义。但是,我认为你的无替换算法存在问题,因为“weight”并不意味着那个。我不确定在你的无替换算法中,“weight”是否有一个有用的含义,这就是为什么我要问是否有这样一个含义。 - ech
1
呵呵!你完全正确。0.5在第二个集合中。好的,我可以使用>= - Jason Orendorff
显示剩余34条评论

45
如果采样是有放回的,使用轮盘赌选择技术(通常用于遗传算法):
  1. 将权重排序
  2. 计算累积权重
  3. [0,1]*totalWeight中选择一个随机数
  4. 找到该数字所在的区间
  5. 选择相应区间的元素
  6. 重复k

alt text

如果采样是无放回的,您可以通过在每次迭代后从列表中删除所选元素,然后重新归一化权重,使它们的总和为1(有效的概率分布函数)来调整上述技术。

15
+1,这个方案非常清晰易懂。但请注意,轮盘赌算法的时间复杂度为O(nlog m + m),其中n是样本数量,m是项目数量。如果省略排序并在第4步中进行二分查找,则可以达到此时间复杂度。此外,它需要 O(m) 的空间来存储累积权重。在我的答案中,有一个14行的函数,它以O(n+m)时间复杂度和O(1)空间复杂度执行相同的操作。 - Jason Orendorff
1
如果我必须删除选定的元素,我需要复制整个列表,我假设我们不允许对输入列表进行任何修改,这是昂贵的。 - nimcap
1
你认为使用树状数组能在这里有所帮助吗? - sw.
2
不提供替换方法并不好。请搜索“带有不等概率的系统抽样”。有一个O(n)算法,无需重新计算权重。 - Pawel
如果采样是无替换的,您可以通过在每次迭代后从列表中删除所选元素,然后重新归一化权重,使它们的总和为1(有效概率分布函数)来适应上述技术。这是一个诱人的算法,但如果您试图证明它是正确的,您会发现它实际上会扭曲分布。然而,有一种非常聪明的方法可以将轮盘选择适应无替换采样。 - Stef
显示剩余3条评论

34

我知道这是一个非常古老的问题,但我认为有一个巧妙的技巧可以在O(n)时间内完成,只需应用一些数学!

指数分布具有两个非常有用的属性。

  1. 给定不同速率参数的不同指数分布的n个样本,则给定样本是最小值的概率等于其速率参数除以所有速率参数之和。

  2. 它是“无记忆”的。因此,如果您已经知道最小值,则其余元素中任何一个是第二个最小值的概率与如果真正的最小值被删除(并且永远不会生成),则该元素将成为新最小值的概率相同。这似乎很明显,但我认为由于某些条件概率问题,它可能不适用于其他分布。

使用事实1,我们知道可以通过生成具有权重等于速率参数的这些指数分布样本,然后选择具有最小值的样本来选择单个元素。

使用事实2,我们知道我们不必重新生成指数样本。而是为每个元素生成一个样本,并取具有最低样本的k个元素。

找到最低的k可以在O(n)中完成。使用快速选择算法找到第k个元素,然后简单地通过所有元素进行另一次遍历,并输出所有低于第k个的元素。

有用的注释:如果您没有立即访问库以生成指数分布样本,则可以通过以下方法轻松完成: -ln(rand())/weight


1
我认为这是多年后唯一正确的答案。我曾经找到过这个正确的方法,但从未记得输入这里。我会仔细阅读你的答案并接受它。 - nimcap
3
这个方法有参考文献吗?我找到了一个名为“加权随机抽样”的文件(链接为https://docdro.id/XTglD09),它有类似的想法。虽然我看出这种方法的逻辑,但我错过了一个参考文献来证明这是正确的分布。 - Royi
1
不好意思,我没有;这是我自己想出来的东西。所以你对它持怀疑态度是正确的。但如果比我更聪明的人没有发现它并且给予比堆栈溢出帖子更严格的处理,我会感到惊讶。 - Joe K
我在 JavaScript 中实现了这个,但它没有给出预期的结果:https://jsfiddle.net/gasparl/kamub5rq/18/ - 有任何想法为什么吗? - gaspar
由于这个问题/答案仍然受到一些关注,我查看了数学/计算机科学文献中是否有这个已知的算法(如果你能称之为算法的话)。它似乎偶尔会出现。我找到的最早的是这篇论文:https://utopia.duth.gr/~pefraimi/research/data/2007EncOfAlg.pdf,作者是Efraimidis和Spirakis。它给出了上述算法的一个小变种(选择最大的`rand()^(1/weight)`而不是最小的`-ln(rand())/weight`),但似乎没有证明其正确性,也没有明确说明它是一项发明还是一个先前的参考。 - undefined

3

我用Ruby完成了这个

https://github.com/fl00r/pickup

require 'pickup'
pond = {
  "selmon"  => 1,
  "carp" => 4,
  "crucian"  => 3,
  "herring" => 6,
  "sturgeon" => 8,
  "gudgeon" => 10,
  "minnow" => 20
}
pickup = Pickup.new(pond, uniq: true)
pickup.pick(3)
#=> [ "gudgeon", "herring", "minnow" ]
pickup.pick
#=> "herring"
pickup.pick
#=> "gudgeon"
pickup.pick
#=> "sturgeon"

这个版本返回的结果与Jason Orendorff的帖子不一致。具体来说,在像从[1,1,1,1,9996]中选择(4, unique)这样的权重时,低权重项的结果不均匀。 - robbat2

1

这是一个来自geodns的Go实现:

package foo

import (
    "log"
    "math/rand"
)

type server struct {
    Weight int
    data   interface{}
}

func foo(servers []server) {
    // servers list is already sorted by the Weight attribute

    // number of items to pick
    max := 4

    result := make([]server, max)

    sum := 0
    for _, r := range servers {
        sum += r.Weight
    }

    for si := 0; si < max; si++ {
        n := rand.Intn(sum + 1)
        s := 0

        for i := range servers {
            s += int(servers[i].Weight)
            if s >= n {
                log.Println("Picked record", i, servers[i])
                sum -= servers[i].Weight
                result[si] = servers[i]

                // remove the server from the list
                servers = append(servers[:i], servers[i+1:]...)
                break
            }
        }
    }

    return result
}

1
如果您想从一个有权重的集合中选择x个元素,且不重复选择,使得每个元素被选中的概率与其权重成比例:
import random

def weighted_choose_subset(weighted_set, count):
    """Return a random sample of count elements from a weighted set.

    weighted_set should be a sequence of tuples of the form 
    (item, weight), for example:  [('a', 1), ('b', 2), ('c', 3)]

    Each element from weighted_set shows up at most once in the
    result, and the relative likelihood of two particular elements
    showing up is equal to the ratio of their weights.

    This works as follows:

    1.) Line up the items along the number line from [0, the sum
    of all weights) such that each item occupies a segment of
    length equal to its weight.

    2.) Randomly pick a number "start" in the range [0, total
    weight / count).

    3.) Find all the points "start + n/count" (for all integers n
    such that the point is within our segments) and yield the set
    containing the items marked by those points.

    Note that this implementation may not return each possible
    subset.  For example, with the input ([('a': 1), ('b': 1),
    ('c': 1), ('d': 1)], 2), it may only produce the sets ['a',
    'c'] and ['b', 'd'], but it will do so such that the weights
    are respected.

    This implementation only works for nonnegative integral
    weights.  The highest weight in the input set must be less
    than the total weight divided by the count; otherwise it would
    be impossible to respect the weights while never returning
    that element more than once per invocation.
    """
    if count == 0:
        return []

    total_weight = 0
    max_weight = 0
    borders = []
    for item, weight in weighted_set:
        if weight < 0:
            raise RuntimeError("All weights must be positive integers")
        # Scale up weights so dividing total_weight / count doesn't truncate:
        weight *= count
        total_weight += weight
        borders.append(total_weight)
        max_weight = max(max_weight, weight)

    step = int(total_weight / count)

    if max_weight > step:
        raise RuntimeError(
            "Each weight must be less than total weight / count")

    next_stop = random.randint(0, step - 1)

    results = []
    current = 0
    for i in range(count):
        while borders[current] <= next_stop:
            current += 1
        results.append(weighted_set[current][0])
        next_stop += step

    return results

我认为你可以通过在开始时复制"weighted_set"并随机排列来消除所选元素之间的相关性。 - Jason Orendorff
反思一下,我不确定我该如何证明这一点。 - Jason Orendorff
在这两个方面我也一样 :) - ech

1
如果您想要生成带有替换的大型随机整数数组,可以使用分段线性插值。例如,使用NumPy/SciPy:
import numpy
import scipy.interpolate

def weighted_randint(weights, size=None):
    """Given an n-element vector of weights, randomly sample
    integers up to n with probabilities proportional to weights"""
    n = weights.size
    # normalize so that the weights sum to unity
    weights = weights / numpy.linalg.norm(weights, 1)
    # cumulative sum of weights
    cumulative_weights = weights.cumsum()
    # piecewise-linear interpolating function whose domain is
    # the unit interval and whose range is the integers up to n
    f = scipy.interpolate.interp1d(
            numpy.hstack((0.0, weights)),
            numpy.arange(n + 1), kind='linear')
    return f(numpy.random.random(size=size)).astype(int)

如果你想进行无重复抽样,这种方法并不是很有效。


0

使用递归进行无重复抽样 - 在c#中具有优雅而简短的解决方案

//从60个学生中选择4个人的不同组合方式有多少种

class Program
{
    static void Main(string[] args)
    {
        int group = 60;
        int studentsToChoose = 4;

        Console.WriteLine(FindNumberOfStudents(studentsToChoose, group));
    }

    private static int FindNumberOfStudents(int studentsToChoose, int group)
    {
        if (studentsToChoose == group || studentsToChoose == 0)
            return 1;

        return FindNumberOfStudents(studentsToChoose, group - 1) + FindNumberOfStudents(studentsToChoose - 1, group - 1);

    }
}

0

这个方案使用O(n)的时间复杂度,且不会产生多余的内存使用。我相信这是一个聪明而高效的解决方案,易于移植到任何语言。前两行只是为了在Drupal中填充示例数据。

function getNrandomGuysWithWeight($numitems){
  $q = db_query('SELECT id, weight FROM theTableWithTheData');
  $q = $q->fetchAll();

  $accum = 0;
  foreach($q as $r){
    $accum += $r->weight;
    $r->weight = $accum;
  }

  $out = array();

  while(count($out) < $numitems && count($q)){
    $n = rand(0,$accum);
    $lessaccum = NULL;
    $prevaccum = 0;
    $idxrm = 0;
    foreach($q as $i=>$r){
      if(($lessaccum == NULL) && ($n <= $r->weight)){
        $out[] = $r->id;
        $lessaccum = $r->weight- $prevaccum;
        $accum -= $lessaccum;
        $idxrm = $i;
      }else if($lessaccum){
        $r->weight -= $lessaccum;
      }
      $prevaccum = $r->weight;
    }
    unset($q[$idxrm]);
  }
  return $out;
}

0
我在这里提供一个简单的解决方案,用于选择1个项目,您可以轻松地将其扩展为k个项目(Java风格):
double random = Math.random();
double sum = 0;
for (int i = 0; i < items.length; i++) {
    val = items[i];
    sum += val.getValue();
    if (sum > random) {
        selected = val;
        break;
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接