从未知长度的序列中,在仅进行一次迭代的情况下随机选择N个不同的项

50

我正在尝试编写一个算法,从序列中随机选择N个不同的项目,而不事先知道序列的大小,并且在迭代超过一次的情况下成本昂贵。例如,序列的元素可能是一个巨大文件的行。

当N=1时(即,“从巨大序列中随机选择一个元素”),我已经找到了一个解决方案:

import random
items = range(1, 10) # Imagine this is a huge sequence of unknown length
count = 1
selected = None
for item in items:
    if random.random() * count < 1:
        selected = item
    count += 1

但是我怎样才能对其他值的N(比如说,N=3)实现相同的事情呢?


6
虽然这不是回答问题的答案,但是请注意对于内置集合(以及许多其他集合),您可以使用random.sample(your_collection, N)来获取N个随机元素。请注意不要改变原意。 - Mark Amery
你说“事先不知道序列的大小”,但是你的代码示例中使用了上限 range(1, 10)。这真的是一个 XY 问题,询问“如何确定/估计迭代器长度的上限(而不进行迭代)”吗?例如,如果它是一个文本文件,我们只需获取(/估计)文件大小,然后除以估计的平均/最大/最小行长度(以字符为单位)。 (对于 Unicode,估计字节中的字符长度) - smci
从3.6/PEP 424开始,对象现在可以选择性地拥有__length_hint __()当我预先知道可迭代类的长度时,我能加速它吗?。而且,通常不需要将整个文件调用到内存中来估计其行长/记录长度/任何其他内容。那么,您正在处理什么类型的数据,我们如何高效地估计(上限)其长度? - smci
10个回答

86

如果您的序列足够短,可以将其读入内存并随机排序,则一种直接的方法是只使用 random.shuffle

import random
arr=[1,2,3,4]

# In-place shuffle
random.shuffle(arr)

# Take the first 2 elements of the now randomized array
print arr[0:2]
[1, 3]

根据您的序列类型,您可能需要调用list(your_sequence)将其转换为列表,但这将在您的序列中对象的类型不同的情况下仍然有效。

如果您的序列无法适应内存,或者此方法的内存或CPU要求对您来说太高,则需要使用其他解决方案。


5
数组的大小是“未知的”或者“不可能知道的”,而且它可能非常巨大。例如,从100G的数据流中随机选择n个元素。 - Jackson Tale
优雅的解决方案 - Mehmet Burak Sayıcı

51

使用蓄水池抽样。这是一个非常简单的算法,适用于任何N

这里是一个Python实现,这里是另外一个实现。


47

我发现最简单的方法是在SO中找到这个答案,稍作改进如下:

import random

my_list = [1, 2, 3, 4, 5]
how_big = 2

new_list = random.sample(my_list, how_big)

# To preserve the order of the list, you could do:
randIndex = random.sample(range(len(my_list)), how_big)
randIndex.sort()
new_list = [my_list[i] for i in randIndex]

6
我猜这应该是最佳答案。 - matanster
1
这应该是所选答案。 - Chau Pham

19

如果您使用的是Python 3.6以上版本,则可以使用choices。

from random import choices

items = range(1, 10)
new_items = choices(items, k = 3)

print(new_items) 
[6, 3, 1]

1
很好的答案,但只适用于3.6+。 - Dan
5
但这可能会有重复! - Pawan Mishra
2
与其他答案不同的是,如果 k > len(items),这也可以工作。正是我所需要的,谢谢! - Nick K9

4
以下代码将从数组 X 中随机选择 N 个项目:
import random
list(map(lambda _: random.choice(X), range(N)))

2
这不会给出不同的元素:>>> x = ["a", "b", "c", "d", "e", "f", "g", "h", "i"] >>> list(map(lambda _: random.choice(x), range(3))) ['c', 'a', 'a'] - Gábor Nagy

4

@NPE是正确的,但被链接的实现是次优的,不太符合"pythonic"(Python风格)的特点。这里提供更好的实现方式:

def sample(iterator, k):
    """
    Samples k elements from an iterable object.

    :param iterator: an object that is iterable
    :param k: the number of items to sample
    """
    # fill the reservoir to start
    result = [next(iterator) for _ in range(k)]

    n = k - 1
    for item in iterator:
        n += 1
        s = random.randint(0, n)
        if s < k:
            result[s] = item

    return result

编辑 正如@panda-34所指出的那样,原版本存在缺陷,但不是因为我使用了 randint 而不是 randrange。问题在于我的初始值 n 没有考虑到 randint 是包含范围两端的。如果考虑到这一点,问题就得到解决了。(注意:你也可以使用 randrange,因为它在最小值上是包含的,在最大值上是排除的。)


1
快速检查Counter(itertools.chain.from_iterable(sample(iter(range(100)), 5) for x in range(100000)))显示出对范围开始部分的严重和一致的偏向。 - panda-34
罪魁祸首是使用了 randint 而不是 randrange - panda-34
1
@panda-34,感谢你的提醒!我根据你的评论更新了答案以解决这个问题。 - JesseBuesking
在这里使用randrange会更整洁,而不是从范围的末尾显式地减去1,以使randint的行为类似于randrange - Mark Amery

3

只需要接受或拒绝每个新项目一次就足够了,如果你接受它,就随机选择一个旧项目丢弃。

假设您随机选择了K个中的N个项目,并看到第(K+1)个项目。以N/(K+1)的概率接受它,其概率是OK的。当前项目以N/K的概率进入,并以(N/(K+1))(1/N) = 1/(K+1)的概率被丢弃,因此通过(N/K)(K/(K+1))的概率幸存下来,所以它们的概率也是OK的。

是的,我看到有人向您指出了蓄水池抽样 - 这就是它的工作原理之一的解释。


2
如aix所提到的,蓄水池抽样是可行的。另一种选择是为每个数字生成一个随机数,并选择前k个数字。
要进行迭代操作,维护一个由k个(随机数,数字)对组成的堆,并在遇到新数字时将其插入堆中,如果它大于堆中最小值,则替换掉最小值。

我喜欢这个 - 很容易看出它的工作原理,因为你只是为序列中的每个条目生成一个随机数并取前k个。另一方面,水塘抽样乍一看似乎可能有效,但需要一些思考和计算才能证明它的可行性。 - Mark Amery

0

numpy 库中有一个实现。

假设 N 小于数组的长度,你需要执行以下操作:

# my_array is the array to be sampled from
assert N <= len(my_array)
indices = np.random.permutation(N)  # Generates shuffled indices from 0 to N-1
sampled_array = my_array[indices]

如果您需要对整个数组进行取样,而不仅仅是前面的 N 个位置,则可以使用以下代码:
import random
sampled_array = my_array[random.sample(len(my_array), N)]

0
这是我对一个重复问题的回答(在我发布之前关闭),与之有些关联(“生成没有重复数字的随机数”)。由于这种方法与其他答案不同,我会将它留在这里,以便在需要时提供额外的见解。
from random import randint

random_nums = []
N = # whatever number of random numbers you want
r = # lower bound of number range
R = # upper bound of number range

x = 0

while x < N:
    random_num = randint(r, R) # inclusive range
    if random_num in random_nums:
        continue
    else:
        random_nums.append(random_num)
        x += 1

while循环比for循环更好的原因是它允许更容易地实现随机生成中的非跳过(即如果您获得3个重复项,则不会获得N-3个数字)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接