Python中的概率分布

21
我有许多钥匙,每个都有一个不太可能的变量。我想随机选择其中一个钥匙,但是我希望对于不太可能(键、值)被选择而言,它比较不太可能被选中,而对于较可能(更可能)的对象来说,则更容易被选中。我想知道你是否有任何建议,最好是我可以使用的现有Python模块,否则我将需要自己制作。
我已经查看了随机模块;它似乎没有提供这个功能。
我需要为包含2,455个对象的1000个不同集合做出这样的选择数百万次。每个集合将在彼此之间交换物品,因此随机选择器需要具有动态性。对于包含2,433个对象的1000个集合,总共有2,433亿个对象;低内存消耗至关重要。由于这些选择不是算法的主要部分,我需要这个过程非常快速;CPU时间有限。
谢谢。
更新:
好的,我尽力考虑了您的建议,但时间太紧了...
我查看了二叉搜索树方法,似乎太冒险了(复杂和复杂)。其他建议都类似于ActiveState配方。我获取了它并稍作修改,希望能够更有效。
def windex(dict, sum, max):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    n = random.uniform(0, 1)
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            break
        n = n - weight
    return key

我希望通过动态维护置信度的和以及最大置信度来提高效率。欢迎提出其他建议。你们为我节省了很多时间和精力,同时提高了我的效率,这太不可思议了。谢谢!谢谢!谢谢!

更新2:

我决定通过让它一次选择更多的选项来使其更加高效。虽然这会导致算法精度有所损失,但由于其具有动态性质,因此这是可以接受的。无论如何,这就是我现在拥有的:

def weightedChoices(dict, sum, max, choices=10):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    list = [random.uniform(0, 1) for i in range(choices)]
    (n, list) = relavate(list.sort())
    keys = []
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            keys.append(key)
            if list: (n, list) = relavate(list)
            else: break
        n = n - weight
    return keys
def relavate(list):
    min = list[0]
    new = [l - min for l in list[1:]]
    return (min, new)

我还没有试过它。如果您有任何意见/建议,请不要犹豫。谢谢!

更新3:

我整天都在为Rex Logan的答案定制任务版本而努力工作。这个版本不像是2个对象和权重的数组,而是一个特殊的字典类;由于Rex的代码生成随机索引,所以事情变得相当复杂... 我还编写了一个测试用例,它有点类似于我的算法将发生的情况(但在尝试之前我不太确定!)基本原则是:随机生成的键越多,再次生成的可能性就越小:

import random, time
import psyco
psyco.full()

class ProbDict():
    """
    Modified version of Rex Logans RandomObject class. The more a key is randomly
    chosen, the more unlikely it will further be randomly chosen. 
    """
    def __init__(self,keys_weights_values={}):
        self._kw=keys_weights_values
        self._keys=self._kw.keys()
        self._len=len(self._keys)
        self._findSeniors()
        self._effort = 0.15
        self._fails = 0
    def __iter__(self):
        return self.next()
    def __getitem__(self, key):
        return self._kw[key]
    def __setitem__(self, key, value):
        self.append(key, value)
    def __len__(self):
        return self._len
    def next(self):
        key=self._key()
        while key:
            yield key
            key = self._key()
    def __contains__(self, key):
        return key in self._kw
    def items(self):
        return self._kw.items()
    def pop(self, key):  
        try:
            (w, value) = self._kw.pop(key)
            self._len -=1
            if w == self._seniorW:
                self._seniors -= 1
                if not self._seniors:
                    #costly but unlikely:
                    self._findSeniors()
            return [w, value]
        except KeyError:
            return None
    def popitem(self):
        return self.pop(self._key())
    def values(self):
        values = []
        for key in self._keys:
            try:
                values.append(self._kw[key][1])
            except KeyError:
                pass
        return values
    def weights(self):
        weights = []
        for key in self._keys:
            try:
                weights.append(self._kw[key][0])
            except KeyError:
                pass
        return weights
    def keys(self, imperfect=False):
        if imperfect: return self._keys
        return self._kw.keys()
    def append(self, key, value=None):
        if key not in self._kw:
            self._len +=1
            self._kw[key] = [0, value]
            self._keys.append(key)
        else:
            self._kw[key][1]=value
    def _key(self):
        for i in range(int(self._effort*self._len)):
            ri=random.randint(0,self._len-1) #choose a random object
            rx=random.uniform(0,self._seniorW)
            rkey = self._keys[ri]
            try:
                w = self._kw[rkey][0]
                if rx >= w: # test to see if that is the value we want
                    w += 1
                    self._warnSeniors(w)
                    self._kw[rkey][0] = w
                    return rkey
            except KeyError:
                self._keys.pop(ri)
        # if you do not find one after 100 tries then just get a random one
        self._fails += 1 #for confirming effectiveness only
        for key in self._keys:
            if key in self._kw:
                w = self._kw[key][0] + 1
                self._warnSeniors(w)
                self._kw[key][0] = w
                return key
        return None
    def _findSeniors(self):
        '''this function finds the seniors, counts them and assess their age. It
        is costly but unlikely.'''
        seniorW = 0
        seniors = 0
        for w in self._kw.itervalues():
            if w >= seniorW:
                if w == seniorW:
                    seniors += 1
                else:
                    seniorsW = w
                    seniors = 1
        self._seniors = seniors
        self._seniorW = seniorW
    def _warnSeniors(self, w):
        #a weight can only be incremented...good
        if w >= self._seniorW:
            if w == self._seniorW:
                self._seniors+=1
            else:
                self._seniors = 1
                self._seniorW = w
def test():
    #test code
    iterations = 200000
    size = 2500
    nextkey = size 


    pd = ProbDict(dict([(i,[0,i]) for i in xrange(size)]))
    start = time.clock()
    for i in xrange(iterations):
        key=pd._key()
        w=pd[key][0]
        if random.randint(0,1+pd._seniorW-w):
            #the heavier the object, the more unlikely it will be removed
            pd.pop(key)
        probAppend = float(500+(size-len(pd)))/1000
        if random.uniform(0,1) < probAppend:
            nextkey+=1
            pd.append(nextkey)
    print (time.clock()-start)*1000/iterations, "msecs / iteration with", pd._fails, "failures /", iterations, "iterations"
    weights = pd.weights()
    weights.sort()
    print "avg weight:", float(sum(weights))/pd._len, max(weights), pd._seniorW, pd._seniors, len(pd), len(weights)
    print weights
test()

仍然欢迎任何评论。@Darius:你的二叉树对我来说太复杂了,我不认为它的叶子节点可以有效地被移除... 谢谢大家


如果您能准确说明概率应该如何取决于“信念”,那么给出更精确的答案会更容易些。 - David Z
PHP中二分查找方法的大部分复杂性都在二分查找函数本身中,而Python则将其包含在标准库中(即bisect模块)。 - Darius Bacon
@Darius:是的,但我仍然害怕它,因为我不理解它,也从未使用过它。而且我想知道它是否真的值得?你会如何实现它? - Nicholas Leonard
你应该看一下https://dev59.com/kuo6XIcBkEYKwwoYTzNK,它不需要在重量改变时重新计算,而且如果你有4个或2433个物品,它的速度都一样快。存储只是权重,所以不需要生成大的地图。 - Rex Logan
@Rex:我会尝试的。(我看了你的第一个答案;现在我看到你有第二个答案)。我喜欢你的想法。起初它似乎很复杂。但是你对代码的介绍使其易于理解。实际上,它似乎更直观。谢谢(现在是时候尝试集成它了)。 - Nicholas Leonard
显示剩余7条评论
12个回答

26

此Activestate食谱提供了一种易于跟随的方法,特别是评论中的版本,它不需要您预先规范化权重:

import random

def weighted_choice(items):
    """items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    n = random.uniform(0, weight_total)
    for item, weight in items:
        if n < weight:
            return item
        n = n - weight
    return item
如果您有一个大型项目列表,这将会很慢。在这种情况下,二分查找可能会更好……但是如果您的样本量较小,则编写它也会更加复杂,而收益却微不足道。如果您想要采用这种方法,这里是Python中二分查找的示例

(我建议您对数据集进行一些快速性能测试,因为这种算法的不同方法的性能通常有点晦涩难懂。)


编辑: 我按照自己的建议进行了一些测试,因为我很好奇。

我比较了四个方法:

上面提到的weighted_choice函数。

像这样的二分查找选择函数:

def weighted_choice_bisect(items):
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    return items[bisect.bisect(added_weights, random.random() * last_sum)][0]

编译版本的1:

def weighted_choice_compile(items):
    """returns a function that fetches a random item from items

    items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    def choice(uniform = random.uniform):
        n = uniform(0, weight_total)
        for item, weight in items:
            if n < weight:
                return item
            n = n - weight
        return item
    return choice

2的编译版本:

def weighted_choice_bisect_compile(items):
    """Returns a function that makes a weighted random choice from items."""
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    def choice(rnd=random.random, bis=bisect.bisect):
        return items[bis(added_weights, rnd() * last_sum)][0]
    return choice
我接下来创建了一个类似于这样的大列表:
choices = [(random.choice("abcdefg"), random.uniform(0,50)) for i in xrange(2500)]

这是一个过于简单的性能分析函数:

def profiler(f, n, *args, **kwargs):
    start = time.time()
    for i in xrange(n):
        f(*args, **kwargs)
    return time.time() - start

结果:

(对函数进行了1,000次调用所需的时间,以秒为单位。)

  • 简单未编译: 0.918624162674
  • 二进制未编译: 1.01497793198
  • 简单编译: 0.287325024605
  • 二进制编译: 0.00327413797379

"编译"结果包括编译选择函数一次的平均时间。(我计时了1,000次编译,然后将该时间除以1,000,并将结果添加到选择函数时间中。)

所以: 如果您有一个很少更改的项目+权重列表,则二进制编译方法远远是最快的。


我不明白为什么后面的函数可以编译通过,而前面的函数不能(未编译通过)。你能解释一下或者给我一些相关信息吗?非常感谢! - Nicholas Leonard
3
“compile”并不是准确的词语,实际上应该是“工厂模式”。这些函数会尽可能地预先计算工作,然后返回一个能够执行选择部分的新函数(一个“闭包”)。 - David

6
在原帖的评论中,Nicholas Leonard建议交换和采样都需要快速。这里有一个想法:如果只有采样需要快速,我们可以使用值的数组以及它们概率的运行总和,并在运行总和上进行二进制搜索(键为均匀随机数)——这是一个O(log(n))操作。但是交换需要更新所有在交换条目之后出现的运行总和值——这是一个O(n)操作。(你能选择仅交换其列表末尾附近的项目吗?我假设不能。)
因此,让我们在两个操作中都达到O(log(n))。不要使用数组,而是为每个要从中采样的集合保留二叉树。叶子节点保存样本值及其(未归一化的)概率。分支节点保存其子节点的总概率。
要进行采样,请生成0到根节点的总概率之间的均匀随机数x,并下降树。在每个分支处,如果左侧子节点的总概率<=x,则选择左侧子节点。否则,从x中减去左侧子节点的概率并向右移动。返回到达的叶子值。
要进行交换,请从树中删除叶子并调整导向它的分支(减少它们的总概率,并剪切任何单个子分支节点)。将叶子插入目标树:你可以选择将其放在哪里,所以保持平衡。在每个级别上随机选择一个子节点可能就足够了——这是我开始的地方。将每个父节点的概率增加到根。
现在,平均采样和交换都是O(log(n))。(如果需要保证平衡,则简单的方法是向分支节点添加另一个字段,其中包含整个子树中叶子的计数。添加叶子时,在每个级别上选择具有较少叶子的子节点。这会留下一个树仅通过删除而不平衡的可能性;如果集合之间的流量相当均匀,则这不会成为问题,但如果存在问题,则在遍历中使用每个节点上的叶计数信息选择旋转。)
更新:应请求,这里有一个基本实现。没有进行任何调整。用法:
>>> t1 = build_tree([('one', 20), ('two', 2), ('three', 50)])
>>> t1
Branch(Leaf(20, 'one'), Branch(Leaf(2, 'two'), Leaf(50, 'three')))
>>> t1.sample()
Leaf(50, 'three')
>>> t1.sample()
Leaf(20, 'one')
>>> t2 = build_tree([('four', 10), ('five', 30)])
>>> t1a, t2a = transfer(t1, t2)
>>> t1a
Branch(Leaf(20, 'one'), Leaf(2, 'two'))
>>> t2a
Branch(Leaf(10, 'four'), Branch(Leaf(30, 'five'), Leaf(50, 'three')))

代码:

import random

def build_tree(pairs):
    tree = Empty()
    for value, weight in pairs:
        tree = tree.add(Leaf(weight, value))
    return tree

def transfer(from_tree, to_tree):
    """Given a nonempty tree and a target, move a leaf from the former to
    the latter. Return the two updated trees."""
    leaf, from_tree1 = from_tree.extract()
    return from_tree1, to_tree.add(leaf)

class Tree:
    def add(self, leaf):
        "Return a new tree holding my leaves plus the given leaf."
        abstract
    def sample(self):
        "Pick one of my leaves at random in proportion to its weight."
        return self.sampling(random.uniform(0, self.weight))
    def extract(self):
        """Pick one of my leaves and return it along with a new tree
        holding my leaves minus that one leaf."""
        return self.extracting(random.uniform(0, self.weight))        

class Empty(Tree):
    weight = 0
    def __repr__(self):
        return 'Empty()'
    def add(self, leaf):
        return leaf
    def sampling(self, weight):
        raise Exception("You can't sample an empty tree")
    def extracting(self, weight):
        raise Exception("You can't extract from an empty tree")

class Leaf(Tree):
    def __init__(self, weight, value):
        self.weight = weight
        self.value = value
    def __repr__(self):
        return 'Leaf(%r, %r)' % (self.weight, self.value)
    def add(self, leaf):
        return Branch(self, leaf)
    def sampling(self, weight):
        return self
    def extracting(self, weight):
        return self, Empty()

def combine(left, right):
    if isinstance(left, Empty): return right
    if isinstance(right, Empty): return left
    return Branch(left, right)

class Branch(Tree):
    def __init__(self, left, right):
        self.weight = left.weight + right.weight
        self.left = left
        self.right = right
    def __repr__(self):
        return 'Branch(%r, %r)' % (self.left, self.right)
    def add(self, leaf):
        # Adding to a random branch as a clumsy way to keep an
        # approximately balanced tree.
        if random.random() < 0.5:
            return combine(self.left.add(leaf), self.right)
        return combine(self.left, self.right.add(leaf))
    def sampling(self, weight):
        if weight < self.left.weight:
            return self.left.sampling(weight)
        return self.right.sampling(weight - self.left.weight)
    def extracting(self, weight):
        if weight < self.left.weight:
            leaf, left1 = self.left.extracting(weight)
            return leaf, combine(left1, self.right)
        leaf, right1 = self.right.extracting(weight - self.left.weight)
        return leaf, combine(self.left, right1)

更新2:回答另一个问题时,Jason Orendorff指出可以通过在数组中表示二叉树来保持其完美平衡,就像经典堆结构一样。(这也节省了指针所占用的空间。)请参见我对该答案的评论,以了解如何将他的代码适应于此问题。


太棒了!但既然你最懂它,能否实现一下呢?谢谢。 - Nicholas Leonard
哇...你真是很有毅力。我会尝试将你的方法整合到我的算法中。 - Nicholas Leonard
我必须承认,你的树似乎有很大的潜力,但我担心它们的复杂性。也许你可以做到? :D 再次感谢 - Nicholas Leonard
抱歉,我得了 RSI 症候群。我已经打了比我想要的更多的按键了。但后两种方法很简单(我同意反向指针方法有点棘手)。 - Darius Bacon
关于复杂度,我应该更加强调,我会从基本的线性搜索开始,并测量它在您的实际问题上的表现。通过您原始帖子中的实际数字,您可以最好地选择算法。这个算法只是对答案做出最少的假设。 - Darius Bacon
显示剩余5条评论

2
我会使用这个配方。您需要为您的对象添加权重,但这只是一个简单的比例,并将它们放入元组列表(对象,信念/(信念总和))。这应该很容易使用列表理解来完成。

2

2

以下是一种经典的方法,使用伪代码实现,其中random.random()可以生成0到1之间的随机浮点数。

let z = sum of all the convictions
let choice = random.random() * z 
iterate through your objects:
    choice = choice - the current object's conviction
    if choice <= 0, return this object
return the last object

例如:假设你有两个物体,一个重量为2,另一个重量为4。你生成一个从0到6的数字。如果“choice”在0到2之间,即以2/6 = 1/3的概率发生,那么它将减去2并选择第一个物体。如果选择在2和6之间,即以4/6 = 2/3的概率发生,则第一次减法仍会使选择大于0,第二次减法将使第二个物体被选择。

2
你希望给每个对象赋予一个权重。权重越大,发生的可能性就越大。更精确地说,probx =weight/sum_all_weights。
然后在0到sum_all_weights的范围内生成一个随机数,并将其映射到每个对象上。
这段代码允许你生成一个随机索引,并在创建对象时进行映射以提高速度。如果你所有的对象集都具有相同的分布,那么你只需要一个RandomIndex对象即可。
import random

class RandomIndex:
    def __init__(self, wlist):
        self._wi=[]
        self._rsize=sum(wlist)-1
        self._m={}
        i=0
        s=wlist[i]
        for n in range(self._rsize+1):
            if n == s:
                i+=1
                s+=wlist[i]
            self._m[n]=i    

    def i(self):
        rn=random.randint(0,self._rsize)
        return self._m[rn]


sx=[1,2,3,4]


wx=[1,10,100,1000] #weight list
ri=RandomIndex(wx)

cnt=[0,0,0,0]

for i in range(1000):
    cnt[ri.i()] +=1  #keep track of number of times each index was generated

print(cnt)  

2

大约三年后...

如果您使用numpy,可能最简单的选择是使用np.random.choice,它接受一个可能值的列表和一个可选的与每个值相关联的概率序列:

import numpy as np

values = ('A', 'B', 'C', 'D')
weights = (0.5, 0.1, 0.2, 0.2)

print ''.join(np.random.choice(values, size=60, replace=True, p=weights))
# ACCADAACCDACDBACCADCAAAAAAADACCDCAADDDADAAACCAAACBAAADCADABA

1
(一年后) {{link1:Walker的别名方法用于具有不同概率的随机对象}}非常快速且非常简单。

1

这里有一个更好的答案,针对一种特殊的概率分布,Rex Logan's answer 的回答似乎是针对这个问题的。该分布如下:每个对象都有一个介于0到100之间的整数权重,其概率与其权重成比例。既然这是目前被接受的答案,我想这值得考虑。

因此,保持一个101个箱子的数组。每个箱子都包含具有特定重量的所有对象的列表。每个箱子还知道其所有对象的重量。

要进行抽样:按比例随机选择一个箱子(使用其中一个标准配方——线性或二进制搜索)。然后从箱子中均匀随机地选择一个对象。

要转移对象:将其从其箱子中移除,放入目标箱子中,并更新两个箱子的权重。(如果您正在使用二进制搜索进行抽样,则还必须更新使用的运行总和。由于箱子不多,因此这仍然相当快速。)


@Darius 我的分布与你的一样,都是加权分布。我不得不将权重强制设定为我选择的100.0的最大值。请注意,这些权重是浮点数而不是整数。我使用了马尔可夫链来进行递归方法,以便将权重解耦。 - Rex Logan

1

最简单的方法是使用random.choice(它使用均匀分布),并在源集合中变化出现频率。

>>> random.choice([1, 2, 3, 4])
4

... vs:

>>> random.choice([1, 1, 1, 1, 2, 2, 2, 3, 3, 4])
2

因此,您的对象可以具有基本发生率(n),并且在信念率的作用下,将1到n个对象添加到源集合中。这种方法非常简单;但是,如果不同对象的数量很大或信念率需要非常细粒度,则可能会产生重大开销。

或者,如果您使用均匀分布生成更多的随机数并将它们相加,则接近平均值的数字比接近极端值的数字更有可能出现(想象一下掷两个骰子并获得7与12或2的概率)。然后,您可以按信念率对对象进行排序,并使用多个骰子卷生成一个数字,该数字用于计算和索引到对象。使用接近平均值的数字来索引低信念对象,使用接近极端值的数字来索引高信念物品。您可以通过更改“面数”和“骰子”的数量来变化给定对象被选择的确切概率(将对象放入桶中并使用具有少量面数的骰子可能更简单,而不是尝试将每个对象与特定结果相关联):

>>> die = lambda sides : random.randint(1, sides)
>>> die(6)
3
>>> die(6) + die(6) + die(6)
10

谢谢,我刚刚在考虑这个问题。问题是它会占用太多的RAM内存。我还在努力实现更高效的方法。另外要记住的一点是这些概率会有所变化。 - Nicholas Leonard
等一下。我刚刚注意到你是动态创建表格的。这真是太棒了。看起来更可行。但现在我想知道,如果要对包含2,433个对象的集合进行这样的操作数百万次,需要多少CPU时间? - Nicholas Leonard

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接