在Python中连续查找数字流的中位数最有效的方法是什么?

6

我正在尝试解决以下问题:

一队急切的个位数(您的输入)正在等待进入空房间。

我允许一个数字(从左边)每分钟进入房间。

每次新数字进入房间时,我在黑板上记录当前所有数字的中位数。[中位数是按升序排列时的中间数字。]。如果有两个中位数(即两个中间数),则不使用平均值,而是记录其中较小的一个。

我将新数字写在现有数字的右侧,因此我的黑板数字不断变长。

当所有数字都在房间里时,您的黑板上最终会出现什么数字?

考虑以下示例输入:21423814127333

  • 2(最左边的数字)被允许进入房间,因此我在黑板上写上2。
  • 接下来1被允许进入房间与2一起。这两个数字中较小的一个被用作中位数,因此我在黑板上将1标记在2的右侧(我的数字现在是21)
  • 现在4进入了房间。1、2和4的中位数是2,因此我在黑板上添加2(我的数字现在是212)
  • ...这个过程一直持续到最后3进入房间...所有数字现在都在房间里,经排序后分别为1,1,1,2,2,2,3,3,3,3,4,7,8,8.有两个中位数,但它们都是3,因此我在黑板上加上3,我的最终数字是21222222222233。

我的当前解决方案:

num = input()
new = str(num[0])
whole = [num[0]]

for i in range(1, len(num)):
    whole.append(num[i])
    whole.sort()
    new += whole[i//2]

print(new)

问题在于它花费的时间太长了 - 因此它通过了6/10(隐藏)测试用例,但超出了其他4个的时间限制。任何帮助将不胜感激。

3
不必每次对整个列表进行排序,可以使用二分查找来找到适当的插入位置。当列表变得很大时,这将产生巨大的差异。Python的bisect模块可以帮助实现这一点。 - Tim Roberts
考虑利用列表中只有少数可能值的事实。与其实际构建一个列表,不如简单地计算每个数字出现的次数。您能看到如何使用数字计数上的数学方法来找到中位数吗? - Karl Knechtel
4个回答

4

您正在进行重复排序, 使用关键比较, 因此总成本为O(N * N log N), 也就是说,至少是二次方。

个位数(您的输入)正在等待进入

解决这个问题的关键是输入范围限制。 我们知道每个输入x都在此范围内:

0 <= x < 10

使用 计数器。 我们可以轻松地分配十个。

保持运行总数的数字数量已经被允许进入房间。 每次您需要报告中位数时,请计算 总和 有序计数器,停止当您到达总数的一半时。

max_val = 10
counter = {i: 0  for i in range(max_val)}
...
assert 0 <= input_val < max_val

counter[input_val] += 1

cum_sum = 0
for i in range(max_val):
    cum_sum += counter[i]
    ...

由于中位数是一个稳健的统计量, 通常报告的中位数会有一定的稳定性,例如“2, 1, 2, 2, 2, 2”。 您可以使用它来加速计算,通过逐步计算累积和。 但这不会改变大O复杂度,因为有恒定数量的计数器。 我们仍然看到O(N),因为我们必须检查每个进入房间的N个数字,然后报告当前的中位数。 这比依赖于对有序向量进行二分的方法的O(N log N)成本要好。


1
你可以通过维护计数器的累加和来进行优化,而不是仅维护计数器本身。当N到达时,将1添加到所有大于等于N的计数器中,而不仅仅是N。 - Mad Physicist
1
然后,您可以利用中位数的鲁棒性,每次从先前的中位数开始检查,并决定新中位数所在的半部分是否发生了变化。 - Mad Physicist

1

如果您为每个数字保留一个计数器(在列表中),则实际上已经隐式地表示了排序序列。然后有一个指向当前中位数的“指针”:这个指针由两个组成部分组成:中位数本身(它是计数器列表中的索引)以及真正表示中位数的值的出现次数。

当处理新的输入数字时,您可以决定是否应更新此指针。它要么不更新,要么在排序(隐式)列表中向前或向后移动1个单位。

代码:

def generatemedians(iterable):
    counter = [0] * 10  # a counter for each digit

    it = map(int, iterable)
    # Process the first entry
    median = next(it, None)
    if median is None:
        return  # No values
    medianidx = 0
    counter[median] = 1
    yield median

    # Process the other entries
    for i, digit in enumerate(it):
        counter[digit] += 1
        if i % 2 == 0:  # the total number of digits becomes even
            if digit < median:  # The median only changes if the digit is inferior
                if medianidx:
                    medianidx -= 1
                else:
                    median -= 1
                    while not counter[median]:
                        median -= 1
                    medianidx = counter[median] - 1
        else:  # the number of digits becomes odd
            if digit >= median:  # The median doesn't change if the digit is inferior
                if medianidx < counter[median] - 1:
                    medianidx += 1
                else:
                    median += 1
                    while not counter[median]:
                        median += 1
                    medianidx = 0
        yield median

# main 
num = input()
print(*generatemedians(num))

外层循环的一次迭代需要恒定时间,即使median += 1median -= 1需要执行多次,因为median的范围是0..9。


1

由于 whole 已经排序,您可以使用 bisect.insort 插入新项并保持其排序:

from bisect import insort
num = input()
new = str(num[0])
whole = [num[0]]

for i in range(1, len(num)):
    insort(whole, num[i])
    new += whole[i//2]

print(new)

0

维护一个每个数字的累积计数列表,并通过找到对应至少一半已添加数字的第一个位置来计算中位数:

def runMed(S):
    cum = [0]*10
    for i,digit in enumerate(map(int,S),1):
        cum[digit:] = (c+1 for c in cum[digit:])
        yield next(m for m,c in enumerate(cum) if c*2>=i)

输出:

S = "21423814127333"
print(*runMed(S))
# 2 1 2 2 2 2 2 2 2 2 2 2 3 3

每个数字最多需要20次迭代才能产生中位数,从而得到O(n)的解决方案。

当我阅读你的代码时,我期望会出现一个NameError;看起来它在赋值之前引用了n - Chris Wesseling
抱歉,我将“n”更改为“digit”,并忘记了一个“n”,试图使名称更有意义。现在已修复。 - Alain T.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接