重复计算百分位数的快速算法?

38

在一个算法中,我需要每次添加一个值时计算数据集的 75th 百分位数。目前我正在这样做:

  1. 获取值 x
  2. x 插入到已经排序好的数组的末尾
  3. x 向下交换,直到数组排序完成
  4. 读取位置为 array[array.size * 3/4] 的元素

第3步的时间复杂度为 O(n),其余时间复杂度为 O(1),但仍然很慢,特别是当数组变得更大时。有没有什么方法可以优化这个过程?

更新

谢谢 Nikita!由于我正在使用 C++,这是最容易实现的解决方案。以下是代码:

template<class T>
class IterativePercentile {
public:
  /// Percentile has to be in range [0, 1(
  IterativePercentile(double percentile)
    : _percentile(percentile)
  { }

  // Adds a number in O(log(n))
  void add(const T& x) {
    if (_lower.empty() || x <= _lower.front()) {
      _lower.push_back(x);
      std::push_heap(_lower.begin(), _lower.end(), std::less<T>());
    } else {
      _upper.push_back(x);
      std::push_heap(_upper.begin(), _upper.end(), std::greater<T>());
    }

    unsigned size_lower = (unsigned)((_lower.size() + _upper.size()) * _percentile) + 1;
    if (_lower.size() > size_lower) {
      // lower to upper
      std::pop_heap(_lower.begin(), _lower.end(), std::less<T>());
      _upper.push_back(_lower.back());
      std::push_heap(_upper.begin(), _upper.end(), std::greater<T>());
      _lower.pop_back();
    } else if (_lower.size() < size_lower) {
      // upper to lower
      std::pop_heap(_upper.begin(), _upper.end(), std::greater<T>());
      _lower.push_back(_upper.back());
      std::push_heap(_lower.begin(), _lower.end(), std::less<T>());
      _upper.pop_back();
    }            
  }

  /// Access the percentile in O(1)
  const T& get() const {
    return _lower.front();
  }

  void clear() {
    _lower.clear();
    _upper.clear();
  }

private:
  double _percentile;
  std::vector<T> _lower;
  std::vector<T> _upper;
};

2
很好,我最近在面试中也遇到了类似的问题。Nikita已经给出了我的答案。 - Alexandru
1
@Alexandru:相似并不等同于相同 :-) 我认为这里不需要堆解决方案。它可能适用于这个问题:http://stackoverflow.com/questions/2213707/finding-an-appropriate-data-structure/,但我认为在这里使用是错误的。 - Aryabhatta
我认为这段代码存在未定义行为:if (_lower.empty() || x <= _lower.front()) {,因为其求值顺序是未定义的。 - davide
@davide,求值顺序已经定义好了,如果_lower.empty()返回true,则右侧不会被求值。 - martinus
@martinus 你说得对,运算符 &&|| 是一个例外,它们保证了求值的顺序。但是需要注意的是,如果它们被重载为方法,则可能会反转或不保证求值的顺序,但这在这里并不是问题。我将参考 SO 上的这个优秀答案 来解决这个问题。 - davide
6个回答

39
你可以使用两个来完成。不确定是否有更少“人为”的解决方案,但这个方案提供了 O(logn) 时间复杂度,并且堆也包含在大多数编程语言的标准库中。
第一个堆(堆A)包含最小的 75% 元素,另一个堆(堆B)包含其余的(最大的 25%)。第一个堆顶部有最大的元素,第二个堆顶部有最小的元素。
1. 添加元素。 检查新元素 x 是否 <= max(A)。如果是,则将其添加到堆 A 中,否则添加到堆 B 中。 现在,如果我们将 x 添加到堆 A 中并且它变得太大(包含超过 75% 的元素),我们需要从 A 中删除最大的元素(O(logn)),并将其添加到堆 B 中(同样是 O(logn))。 如果堆 B 变得太大,也是类似的。
2. 找到“0.75中位数” 只需从 A 中获取最大的元素(或从 B 中获取最小的元素)。根据堆的实现方式,需要 O(logn) 或 O(1) 时间。

编辑
正如Dolphin所指出的,如果我们想要精确的答案,就需要准确地指定每个n应该有多大的堆。例如,如果size(A) = floor(n * 0.75)并且size(B)是剩下的部分,那么对于每一个n > 0array[array.size * 3/4] = min(B)


1
我认为这个想法可行,但是我认为需要做一些改变。首先,其中一个堆应该始终拥有你正在寻找的项目。这样,你可以确定每个堆在给定元素数量heap A=floor(n*.75) and heap B=ceil(n*.25)(在这种情况下)时应该是什么大小。接下来,当你添加一个项目时,确定哪个堆需要增长。如果堆A需要增长并且该项小于B的顶部,则将其添加到A中。否则,删除B的顶部,将其添加到A中,然后将新项目添加到B中。(删除然后添加作为修改会更有效率)。 - Dolphin
@Dolphin 对不起,我没有完全理解你的建议。你是说算法有错误吗?还是它可以变得更简单或渐进地更快? - Nikita Rybak
@martinus 别忘了,B 中的任何元素都应该大于等于 A 中的任何元素。因此,如果您根据大小选择添加位置,则需要之后比较 max(A) 和 min(B),并在第二个元素较小的情况下交换它们。 - Nikita Rybak
1
@Nikita - 不,只是一些微调。定义哪个堆应该增长会使添加操作稍微简单一些(您的添加可以执行3个O(logn)操作(添加,删除,添加)。我的建议是在最坏情况下进行两次操作(修改,添加)。选择哪个堆并不重要,但始终将项目放在小堆中将使堆的大小更接近,从而获得(可能微不足道的)性能提升。 - Dolphin
@Nikita 哦,现在我知道为什么他们说睡眠是必要的了... :D - Hari Menon
显示剩余6条评论

17

一个简单的顺序统计树就足够了。

这个平衡版本的树支持O(logn)时间的插入/删除和按秩访问。因此,您不仅可以获得75%的百分位数,还可以获得66%、50%或任何您需要的百分位数,而无需更改代码。

如果您经常访问75%的百分位数,但只偶尔进行插入操作,您可以在插入/删除操作期间始终缓存75%的百分位元素。

大多数标准实现(如Java的TreeMap)都是顺序统计树。


1
有用的技巧加1分。但是你犯了一个错误:Java的TreeSet(或Map)不会提供迭代从树根到叶子节点所需的工具。如果我没记错,STL版本也是如此。你必须编写自己的平衡树或者修改别人的代码。这很难让人愉快。 - Nikita Rybak
1
+1 - 但是你不能通过排名来索引Java的TreeSet。如果值不重复,你可以使用Java的TreeSet;你只需要跟踪当前的第75个百分位数以及左边和右边的项目数量。当你添加一个元素时,将其放入集合中并更新左右的数字。如果右边的元素过多,使用higher方法获取下一个元素;如果左边的元素过多,使用lower方法获取前一个元素;如果一切正常,不做任何操作。如果值重复,你需要创建一个从键到某个集合(列表?)的映射,然后类似的技巧也适用。 - Rex Kerr
@Nikita:我并不是在说你必须亲自遍历树。我想说的是,数据结构提供了按位置访问/插入/删除的API。无论如何,我现在对TreeMap也不太确定了... - Aryabhatta
我已经尝试过使用树结构,但是对于我的使用场景,堆实现的速度快了几倍。 - martinus
@martinus:你尝试过缓存吗?不管怎样,很高兴这个论坛对你有帮助 :-) - Aryabhatta
显示剩余4条评论

3
如果您可以接受近似答案,您可以使用直方图代替将整个值存储在内存中。
对于每个新值,请将其添加到适当的箱中。 通过遍历箱并累加计数,计算第75个百分位数,直到达到人口数量的75%。 百分位数的值介于箱的低限和高限之间。
这将提供O(B)复杂度,其中B是箱的数量,即range_size/bin_size。(使用适合您用户案例的bin_size)。
我已经在JVM库中实现了此逻辑:https://github.com/IBM/HBPE,您可以参考它。

-2
这是一个JavaScript解决方案。将其复制粘贴到浏览器控制台中即可运行。$scores 包含得分列表,$percentile给出列表的第n个百分位。因此,第75个百分位为76.8,第99个百分位为87.9。
function get_percentile($percentile, $array) {
    $array = $array.sort();
    $index = ($percentile/100) * $array.length;
    if (Math.floor($index) === $index) {
         $result = ($array[$index-1] + $array[$index])/2;
    }
    else {
        $result = $array[Math.floor($index)];
    }
    return $result;
}

$scores = [22.3, 32.4, 12.1, 54.6, 76.8, 87.3, 54.6, 45.5, 87.9];

get_percentile(75, $scores);
get_percentile(90, $scores);

-2
如果您有已知的值集,以下方法将非常快速:
创建一个大整数数组(甚至字节也可以),其元素数量等于数据的最大值。 例如,如果t的最大值为100,000,则创建一个数组。
int[] index = new int[100000]; // 400kb

现在遍历整个值集合,如下

for each (int t : set_of_values) {
  index[t]++;
}

// You can do a try catch on ArrayOutOfBounds just in case :)

现在按以下方式计算百分位数

int sum = 0, i = 0;
while (sum < 0.9*set_of_values.length) {
  sum += index[i++];
}

return i;

如果值不符合这些限制,您还可以考虑使用TreeMap而不是数组。


这使得插入操作的时间复杂度为O(1),但查找第75个百分位元素的时间复杂度为O(M),其中M是最大值。M可能比N大得多。(此外,请注意OP使用的是双精度浮点值,因此没有希望用合理大小的位图(或重复计数数组)来表示它们)。因此,对于每个部分列表的第75个百分位数列表,总时间复杂度为O(NM)。如果可能值的范围非常小,那么这将是有趣的,但在这里并不是很有帮助。与两堆技巧相比,我不会称其为“非常快”。 - Peter Cordes
我不明白为什么这个答案会被踩。即使值是浮点数,如果它们的分布已知,谨慎地分组可以得到非常准确的结果。如果你能把M降低到足够低,与O(n log(n))相比,它可以非常快速,特别是考虑到操作非常简单和快速(浮点数加法,索引)。此外,由于添加一个数字的时间复杂度是O(1),如果你不需要每次添加一个数字就获取百分位数的更新值,你就可以在堆上节省很多log(n)的查找时间。由于OP正在寻求速度,因此这值得考虑。 - Pepe Mandioca

-2

你可以使用二分查找在O(log n)的时间复杂度内找到正确的位置。然而,将数组向上移动仍然是O(n)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接