我在Rust中实现了类似Jason Orendorff的算法这里。我的版本还支持批量操作:在O(m + log n)
时间内从数据结构中插入和删除(当您想通过它们的ID而不是通过加权选择路径来删除一堆项目时)其中m是要删除的项目数,n是存储的项目数。
我刚刚花了几个小时试图理解“无重复抽样”的算法,发现这个主题比我最初想象的要复杂得多。这很令人兴奋!为了未来读者的利益(祝你有美好的一天!),我在下面进一步记录了我的见解,包括一个已经准备好使用的函数,该函数遵守给定的“包含概率”。关于各种方法的简洁数学概述可以在这里找到:Tillé: Algorithms of sampling with equal or unequal probabilities。例如,Jason的方法可以在第46页找到。他的方法的警告是权重与包含概率不成比例,正如文档中所指出的那样。实际上,第i个包含概率可以递归地计算如下:
def inclusion_probability(i, weights, k):
"""
Computes the inclusion probability of the i-th element
in a randomly sampled k-tuple using Jason's algorithm
(see https://dev59.com/D3I95IYBdhLWcg3wzhd9#2149533)
"""
if k <= 0: return 0
cum_p = 0
for j, weight in enumerate(weights):
# compute the probability of j being selected considering the weights
p = weight / sum(weights)
if i == j:
# if this is the target element, we don't have to go deeper,
# since we know that i is included
cum_p += p
else:
# if this is not the target element, than we compute the conditional
# inclusion probability of i under the constraint that j is included
cond_i = i if i < j else i-1
cond_weights = weights[:j] + weights[j+1:]
cond_p = inclusion_probability(cond_i, cond_weights, k-1)
cum_p += p * cond_p
return cum_p
我们可以通过比较来检查上述函数的有效性
In : for i in range(3): print(i, inclusion_probability(i, [1,2,3], 2))
0 0.41666666666666663
1 0.7333333333333333
2 0.85
到
In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: random_weighted_sample_no_replacement([(1,'a'),(2,'b'),(3,'c')],2))
Out: Counter({'a': 4198, 'b': 7268, 'c': 8534})
一种方法 - 如上文所建议的 - 指定包含概率的方式是从它们计算权重。问题的整个复杂性源于一个无法直接计算权重,因为基本上必须反转递归公式,象征性地说这是不可能的。可以使用各种方法(例如牛顿法)进行数字计算。然而,使用纯Python反转Jacobian的复杂度很快就变得难以承受,我真的建议在这种情况下查看numpy.random.choice
。
幸运的是,有一种使用纯Python的方法,这可能或可能不足以满足您的目的,如果没有太多不同的权重,则效果很好。您可以在第75&76页找到该算法。它通过将取样过程分解为具有相同包含概率的部分来工作,即我们可以再次使用random.sample
!我不会在这里解释原理,因为基础知识在第69页上很好地呈现了。这是带有足够注释的代码:
def sample_no_replacement_exact(items, k, best_effort=False, random_=None, ε=1e-9):
"""
Returns a random sample of k elements from items, where items is a list of
tuples (weight, element). The inclusion probability of an element in the
final sample is given by
k * weight / sum(weights).
Note that the function raises if a inclusion probability cannot be
satisfied, e.g the following call is obviously illegal:
sample_no_replacement_exact([(1,'a'),(2,'b')],2)
Since selecting two elements means selecting both all the time,
'b' cannot be selected twice as often as 'a'. In general it can be hard to
spot if the weights are illegal and the function does *not* always raise
an exception in that case. To remedy the situation you can pass
best_effort=True which redistributes the inclusion probability mass
if necessary. Note that the inclusion probabilities will change
if deemed necessary.
The algorithm is based on the splitting procedure on page 75/76 in:
http://www.eustat.eus/productosServicios/52.1_Unequal_prob_sampling.pdf
Additional information can be found here:
https://dev59.com/D3I95IYBdhLWcg3wzhd9
:param items: list of tuples of type weight,element
:param k: length of resulting sample
:param best_effort: fix inclusion probabilities if necessary,
(optional, defaults to False)
:param random_: random module to use (optional, defaults to the
standard random module)
:param ε: fuzziness parameter when testing for zero in the context
of floating point arithmetic (optional, defaults to 1e-9)
:return: random sample set of size k
:exception: throws ValueError in case of bad parameters,
throws AssertionError in case of algorithmic impossibilities
"""
# random_ defaults to the random submodule
if not random_:
random_ = random
# special case empty return set
if k <= 0:
return set()
if k > len(items):
raise ValueError("resulting tuple length exceeds number of elements (k > n)")
# sort items by weight
items = sorted(items, key=lambda item: item[0])
# extract the weights and elements
weights, elements = list(zip(*items))
# compute the inclusion probabilities (short: π) of the elements
scaling_factor = k / sum(weights)
π = [scaling_factor * weight for weight in weights]
# in case of best_effort: if a inclusion probability exceeds 1,
# try to rebalance the probabilities such that:
# a) no probability exceeds 1,
# b) the probabilities still sum to k, and
# c) the probability masses flow from top to bottom:
# [0.2, 0.3, 1.5] -> [0.2, 0.8, 1]
# (remember that π is sorted)
if best_effort and π[-1] > 1 + ε:
# probability mass we still we have to distribute
debt = 0.
for i in reversed(range(len(π))):
if π[i] > 1.:
# an 'offender', take away excess
debt += π[i] - 1.
π[i] = 1.
else:
# case π[i] < 1, i.e. 'save' element
# maximum we can transfer from debt to π[i] and still not
# exceed 1 is computed by the minimum of:
# a) 1 - π[i], and
# b) debt
max_transfer = min(debt, 1. - π[i])
debt -= max_transfer
π[i] += max_transfer
assert debt < ε, "best effort rebalancing failed (impossible)"
# make sure we are talking about probabilities
if any(not (0 - ε <= π_i <= 1 + ε) for π_i in π):
raise ValueError("inclusion probabilities not satisfiable: {}" \
.format(list(zip(π, elements))))
# special case equal probabilities
# (up to fuzziness parameter, remember that π is sorted)
if π[-1] < π[0] + ε:
return set(random_.sample(elements, k))
# compute the two possible lambda values, see formula 7 on page 75
# (remember that π is sorted)
λ1 = π[0] * len(π) / k
λ2 = (1 - π[-1]) * len(π) / (len(π) - k)
λ = min(λ1, λ2)
# there are two cases now, see also page 69
# CASE 1
# with probability λ we are in the equal probability case
# where all elements have the same inclusion probability
if random_.random() < λ:
return set(random_.sample(elements, k))
# CASE 2:
# with probability 1-λ we are in the case of a new sample without
# replacement problem which is strictly simpler,
# it has the following new probabilities (see page 75, π^{(2)}):
new_π = [
(π_i - λ * k / len(π))
/
(1 - λ)
for π_i in π
]
new_items = list(zip(new_π, elements))
# the first few probabilities might be 0, remove them
# NOTE: we make sure that floating point issues do not arise
# by using the fuzziness parameter
while new_items and new_items[0][0] < ε:
new_items = new_items[1:]
# the last few probabilities might be 1, remove them and mark them as selected
# NOTE: we make sure that floating point issues do not arise
# by using the fuzziness parameter
selected_elements = set()
while new_items and new_items[-1][0] > 1 - ε:
selected_elements.add(new_items[-1][1])
new_items = new_items[:-1]
# the algorithm reduces the length of the sample problem,
# it is guaranteed that:
# if λ = λ1: the first item has probability 0
# if λ = λ2: the last item has probability 1
assert len(new_items) < len(items), "problem was not simplified (impossible)"
# recursive call with the simpler sample problem
# NOTE: we have to make sure that the selected elements are included
return sample_no_replacement_exact(
new_items,
k - len(selected_elements),
best_effort=best_effort,
random_=random_,
ε=ε
) | selected_elements
例子:
In : sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c')],2)
Out: {'b', 'c'}
In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c'),(4,'d')],2))
Out: Counter({'a': 2048, 'b': 4051, 'c': 5979, 'd': 7922})
a
→ 20%,b
→ 40%,c
→ 60%,d
→ 80%。(总和:200%= k。)它有效!In: sample_no_replacement_exact([(1,'a'),(2,'b')],2)
ValueError: inclusion probabilities not satisfiable: [(0.6666666666666666, 'a'), (1.3333333333333333, 'b')]
b
不能出现比a
更频繁,因为两者必须始终被选择。还有更微妙的例子。为了避免在生产中出现异常,请使用best_effort=True,它重新平衡包含概率质量,以便我们始终具有有效的分布。显然,这可能会改变包含概率。
我使用了一个关联映射(权重,对象)。例如:
{
(10,"low"),
(100,"mid"),
(10000,"large")
}
total=10110
在0和'total'之间随机选择一个数字,并迭代键,直到此数字适合给定范围。