如何更新堆中的元素?(优先队列)

44

使用min/max-heap算法时,优先级可能会改变。处理这种情况的一种方法是删除并插入元素以更新队列顺序。

对于使用数组实现的优先级队列,这可能会成为一个性能瓶颈,看起来可以避免,特别是在优先级更改小的情况下。

即使这不是优先级队列的标准操作, 这是一种可以根据我的需求进行修改的自定义实现。

是否有公认的最佳实践方法来更新min/max-heap中的元素?


背景信息:我不是二叉树的专家,我继承了一些代码,其中有一个性能瓶颈,即重新插入优先队列中的元素。我为最小堆制作了一个重新插入函数,对新元素进行重新排序——这比(删除和插入)有明显的改善,但这似乎是其他人可能已经用更优雅的方式解决的问题。

如果需要,我可以提供代码链接,但不想过多关注实现细节——因为这个问答可能可以保持通用。


优先级的变化比插入或删除操作更频繁,而最小堆(在这种情况下)的整个目的是在弹出最佳值时跟踪这些变化。虽然不确定这是否与问题相关?(即使它不是瓶颈- 优化此情况可能会有所帮助) - ideasman42
不,尽管它们可能不会有太大变化(甚至可能不需要重新排序),但这似乎是可以进行优化的事情。即使它遵循某些模式/约束,我认为那可能是一个不同的问题。 - ideasman42
相对于更新次数,您有多少次出队列? - templatetypedef
不确定这会如何改变答案 - 但在这种情况下,大约会有10倍更多的弹出更新。 - ideasman42
我在下面发布了一个答案,详细调查了一个“理想简单用例”的各种算法选项,展示了“我们如何击败C++ STL”。 - Oliver Schönrock
显示剩余2条评论
3个回答

54

典型解决方法

通常的解决方法是将元素标记为无效,并插入一个新元素,然后在弹出无效条目时消除它们。

备选解决方案

如果该方法不够用,则可以在 O(log n) 步骤内恢复最小堆不变式,只要知道正在更改的值的位置。

请回想一下,最小堆使用两个基本操作构建和维护,“siftup” 和 “siftdown”(尽管各种各样的来源对哪个是向上或向下有不同的看法)。其中一个将值推向树下面,另一个则将其向上浮动。

情况1:值增加

如果新值 x1 大于旧值 x0,则只需要修复 x 下面的树,因为 parent(x) ≤ x0 < x1。只需通过将 x 与其两个子节点中较小的那个交换而将 x 向下推到树中,同时 x 大于其子节点之一。

情况2:值减少

如果新值 x1 小于旧值 x,则 x 下面的树不需要调整,因为 x1 < x0 <= either_child(x)。相反,我们只需要向上移动,当 x 小于其父节点时交换 x 和其父节点。无需考虑兄弟节点,因为它们已经大于或等于可能用较小值替换的父节点。

情况3:值不变

不需要任何操作。现有的不变式不会改变。

Python代码示例

测试1,000,000次:创建一个随机堆。更改随机选择的值。恢复堆条件。验证结果是最小堆。

from heapq import _siftup, _siftdown, heapify
from random import random, randrange, choice

def is_minheap(arr):
    return all(arr[i] >= arr[(i-1)//2] for i in range(1, len(arr)))

n = 40
trials = 1_000_000
for _ in range(trials):

    # Create a random heap
    data = [random() for i in range(n)]
    heapify(data)

    # Randomly alter a heap element
    i = randrange(n)
    x0 = data[i]
    x1 = data[i] = choice(data)

    # Restore the heap
    if x1 > x0:                 # value is increased
        _siftup(data, i)
    elif x1 < x0:               # value is decreased
        _siftdown(data, 0, i)

    # Verify the results
    assert is_minheap(data), direction

2
有趣且值得了解,尽管我不确定在我的情况下是否能使用此方法,因为元素在大型数据集上经常更新,这可能会导致大而不可预测的最坏情况。也许这只是一个细节 - 但我正在使用固定大小的分配,因此重新分配潜在大数组的可能性并不那么吸引人。 - ideasman42
1
非常全面的答案(自我上次评论以来已更新),我正在将其用于生产代码中,它显著提高了速度。 - ideasman42
2
我从未见过使用“典型解决方案”。如果您不知道堆中的项在哪里,则两种解决方案的渐近复杂度均为O(n)。如果您知道项在哪里,则“典型解决方案”有点傻,因为正如您所示,重新调整堆是微不足道的。 - Jim Mischel
1
@jim-mischel,没错,更不用说在堆中有未使用的节点时,需要在执行任何堆更改时进行筛选的惩罚了。是否有好的开源堆API示例支持重新调整?(我正在编写自己的代码,但很想看看其他人如何在生产代码中实现这一点)。 - ideasman42
无DECREASE-KEY解决方案的渐近运行时间相同:https://dev59.com/OWox5IYBdhLWcg3wWzJ7#9255681,在实践中甚至可能更快:https://dev59.com/OWox5IYBdhLWcg3wWzJ7#18540646。 - qwr
显示剩余8条评论

7

由于这篇文章包含了链接到可用代码的内容,因此我将为自己的问题发布答案。


实际上,这非常简单。

通常情况下,最小堆实现具有用于排序的函数,请参见示例:BubbleUp/Down

这些函数可以在修改后的元素上运行,具体取决于相对于当前值的更改。例如:

if new_value < old_value {
    heap_bubble_up(heap, node);
} else if new_value > old_value {
    heap_bubble_down(heap, node);
}

虽然操作次数取决于值的分布情况,但与维护排序列表相比,这将等于或少于步骤。
通常情况下,小的更改要比删除+插入高效得多。
请参见代码测试,它实现了一个具有插入/删除/重新设置优先级的最小堆,而无需初始查找(调用者存储不透明引用)。
即使只重新排序所需的元素,对于大型堆来说也可能需要很多操作。
如果这种效率太低,则最小堆可能不适合。
二叉树可能会更好(例如红黑树),其中删除和插入的扩展性更好。
但是,我不确定红黑树能否像最小堆一样就地重新排序

8
在二叉堆中改变一个元素的优先级最费时的部分是找到该元素在堆中的位置。通常需要O(n)时间才能完成,除非你有一个次要数据结构来跟踪每个元素的索引。维护这个数据结构需要在每次移动元素时更新它,但它允许你在O(1) 时间内定位元素。在你定位元素后改变其优先级最多需要O(log n)步。如你所提到的,实际所需步骤取决于更改的幅度大小。 - Jim Mischel
这取决于实现方式,请检查链接的代码 - API 的用户存储一个不透明的节点引用,从而实现快速删除和重新排序。事实上,没有搜索功能(如果它不是平衡的话,确实会更慢)。 - ideasman42
是的,这取决于具体实现。他们的堆元素节点包含索引值。这将作为一个次要的数据结构来跟踪索引,并给客户端带来了额外的负担以取消引用节点以获取实际数据。但它确实运行良好。二叉堆始终是平衡的,但不适合高效搜索排序,所以任何不维护该索引的实现都会使更改优先级或删除任意节点变得相当昂贵。 - Jim Mischel
不错的实现。在我的情况下,我想要最小化比较,当更新顶部元素(从不更新其他任何东西)时。因此,使用c++堆工具,我正在执行:std::pop_heap(h.begin(), h.end(), comp); h.pop_back(); h.push_back(newval); std::push_heap(h.begin(), h.end(), comp);。我认为通过更新顶部元素然后在该顶部元素上调用类似于您的“heap_down”的代码可以在比较数量上击败它(我尝试了递归和迭代实现)。结果:在所有情况下都比上述更多的比较。 - Oliver Schönrock
“Magic”似乎在libstdc++内部实现的__adjust_heap()中。我并没有真正理解它的确切作用,但它可以通过非常少的比较完成工作。 - Oliver Schönrock
我在下面发布了一个答案,详细调查显示“我们如何击败C++ STL”。 - Oliver Schönrock

2

对于那些对各种算法的相对性能感兴趣的人,我进行了一些调查。

重点

  1. 相对较小的堆(3 => 100个元素,但在下面的代码中很容易调整)
  2. 操作:仅更新heap.top()元素而已
  3. 优化标准=重新堆化所需的比较次数

使用c++ STL的规范方法是:

  // Note I am using a max-heap, but the comparator is customisable below
  std::pop_heap(heap.begin(), heap.end(), comp);
  heap.pop_back();
  heap.push_back(newval);
  std::push_heap(heap.begin(), heap.end(), comp);

这是一个非常简单的案例,没有“知道元素在堆中的位置”的问题。所以我们肯定可以做得更好,对吧……?

注意:对于那些感兴趣的人,我的应用程序是归并排序,其中一个大型数据文件被分成20-50个块,排序并写入磁盘。然后重新读取并合并到最终排序文件中。结果发现,选择从哪个文件合并下一个元素是瓶颈,所以我使用了std::priority_queue,它在底层使用了一个堆。然而,这仍然是瓶颈,很难跟上磁盘,并且在比较时受限于CPU。

以下代码研究了4种实现:

  1. STL方式如上(使用libstdc++-11)。这是基本情况。(请注意,我认为libc++在这个领域不太复杂)。
  2. 递归的“向下冒泡”算法
  3. 迭代的“向下冒泡”算法
  4. 一个libstd++ __adjust_heap后跟__push_heap算法

我生成随机堆大小、内容和替换值,并迭代1M次。

结果

  1. 事实证明,Raymond Hettinger的答案中的“Case 1, 2, 3 pre-comparision”总是平均比较更多。所以我把它拿出来,直接运行每个算法。
  2. STL很好
  3. 递归和迭代的向下冒泡算法几乎相同(预期的,因为编译器在-O3 tailall优化时会自动优化递归),即使对于这个非常专业的情况,它们始终比STL有更多的比较。因此,仅仅使用堆原语并没有给你带来任何收益。
  4. 我们可以通过复制未发布函数的代码,“清理”它们(去掉下划线等),并以特定的方式使用它们来解决这个受限、专业的问题,从而击败STL。收益在10-20%左右。

典型结果:

method                              avg_cmp_cnt
std::heap / std::priority_queue     7.568285
bubble up recursively               8.031054
bubble up iteratively               8.047352
libstc++ __adjust_heap              6.327297

测试代码:

#include "fmt/core.h"
#include <algorithm>
#include <cassert>
#include <concepts>
#include <cstddef>
#include <cstdlib>
#include <execution>
#include <iostream>
#include <random>
#include <stdexcept>

template <std::unsigned_integral T>
T log2(T x) {
  T log = 0;
  while (x >>= 1U) ++log;
  return log;
}

template <typename T, typename Comp = std::less<>>
void print_heap(const std::vector<T>& heap, Comp comp = {}) {
  std::size_t levels = log2(heap.size()) + 1;
  unsigned    width  = 6 * (1U << (levels - 1U));
  std::cout << "\n\n";
  for (const auto& e: heap) std::cout << e << " ";
  std::cout << "\n";
  std::cout << fmt::format("is_heap = {:}\n\n", std::is_heap(heap.begin(), heap.end(), comp));
  if (heap.empty()) {
    std::cout << "<empty heap>\n";
    return;
  }
  unsigned idx  = 0;
  bool     done = false;
  for (unsigned l = 0; l != levels; ++l) {
    for (unsigned e = 0; e != 1U << l; ++e) {
      std::cout << fmt::format("{:^{}}", heap[idx], width);
      ++idx;
      if (idx == heap.size()) {
        done = true;
        break;
      }
    }
    width /= 2;
    std::cout << "\n\n";
    if (done) break;
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_stl(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  std::pop_heap(heap.begin(), heap.end(), comp);
  heap.pop_back();
  heap.push_back(newval);
  std::push_heap(heap.begin(), heap.end(), comp);
}

template <typename T, typename Comp = std::less<>>
// NOLINTNEXTLINE recursion is tailcall eliminated by compiler
void bubble_down_recursively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
  const auto left  = 2 * i + 1;
  const auto right = 2 * i + 2;
  const auto n     = heap.size();

  using std::swap; // enable ADL

  if (left >= n) { // no children
    return;
  } else if (right >= n) { // left exists right does not.  NOLINT else after return
    if (comp(heap[i], heap[left])) {
      swap(heap[i], heap[left]);
      bubble_down_recursively(heap, left, comp);
    }
  } else { // both children exist
    // 'larger' is only well named if comp = std::less<>{}
    auto larger = comp(heap[right], heap[left]) ? left : right;
    if (comp(heap[i], heap[larger])) {
      swap(heap[i], heap[larger]);
      bubble_down_recursively(heap, larger, comp);
    }
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_recursively(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;
  bubble_down_recursively(heap, 0, comp);
}

template <typename T, typename Comp = std::less<>>
void bubble_down_iteratively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
  const auto n = heap.size();

  while (true) {
    const std::size_t left  = 2 * i + 1;
    const std::size_t right = 2 * i + 2;

    std::size_t largest = i;

    if ((left < n) && comp(heap[largest], heap[left])) {
      largest = left;
    }
    if ((right < n) && comp(heap[largest], heap[right])) {
      largest = right;
    }

    if (largest == i) {
      break;
    }

    using std::swap; // enable ADL
    swap(heap[i], heap[largest]);
    i = largest;
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_iteratively(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;                       // stick it in anyway
  bubble_down_iteratively(heap, 0, comp); // and fix the heap
}

// borrowed from libstdc++ __push_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void push_heap(RandomAccessIterator first, Distance holeIndex, Distance topIndex,
                         Tp value, Compare& comp) {
  Distance parent = (holeIndex - 1) / 2;
  while (holeIndex > topIndex && comp(*(first + parent), value)) {
    *(first + holeIndex) = *(first + parent);
    holeIndex            = parent;
    parent               = (holeIndex - 1) / 2;
  }
  *(first + holeIndex) = std::move(value);
}

// borrowed from libstdc++ __adjust_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void adjust_heap(RandomAccessIterator first, Distance holeIndex, Distance len, Tp value,
                           Compare comp) {
  const Distance topIndex    = holeIndex;
  Distance       secondChild = holeIndex;
  while (secondChild < (len - 1) / 2) {
    secondChild = 2 * (secondChild + 1);
    if (comp(*(first + secondChild), *(first + (secondChild - 1)))) secondChild--;
    *(first + holeIndex) = *(first + secondChild);
    holeIndex            = secondChild;
  }
  if ((len & 1) == 0 && secondChild == (len - 2) / 2) {
    secondChild          = 2 * (secondChild + 1);
    *(first + holeIndex) = *(first + (secondChild - 1));
    holeIndex            = secondChild - 1;
  }
  push_heap(first, holeIndex, topIndex, value, comp);
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_adjust_heap(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;
  adjust_heap(heap.begin(), 0L, heap.end() - heap.begin(), newval, comp);
}

template <typename T>
struct cmp_counter {
  static std::size_t cmpcount; // NOLINT must be static because STL takes Comp by value
  bool               operator()(T a, T b) {
    ++cmpcount;
    return a < b; // effectively std::less<>{};
  }
  static void reset() { cmpcount = 0; }
};
template <typename T>
std::size_t cmp_counter<T>::cmpcount = 0; // NOLINT global static

int main() {

  using ValueType = int;
  
  struct method {
    using cb_t = void (*)(std::vector<ValueType>&, ValueType, cmp_counter<ValueType>);
    std::string label;
    cb_t        cb;
  };
  auto methods = std::vector<method>{
      {"std::heap / std::priority_queue", &replace_top_using_stl},
      {"bubble up recursively", &replace_top_using_bubble_down_recursively},
      {"bubble up iteratively", &replace_top_using_bubble_down_iteratively},
      {"libstc++ __adjust_heap", &replace_top_using_adjust_heap},
  };

  std::cout << fmt::format("{:35s} {:s}\n", "method", "avg_cmp_cnt");
  for (auto& method: methods) {
    auto prng              = std::mt19937_64(1); // NOLINT fixed seed for repeatability
    auto heap_element_dist = std::uniform_int_distribution<>(1, 1000);
    auto heap_size_dist    = std::uniform_int_distribution<std::size_t>(3, 100);

    const std::size_t number_of_trials = 1'000'000;

    std::size_t total_cmpcount = 0;
    cmp_counter<ValueType> comp;
    for (unsigned i = 0; i != number_of_trials; ++i) {
      std::vector<int> h(heap_size_dist(prng));
      std::generate(h.begin(), h.end(), [&] { return ValueType(heap_element_dist(prng)); });

      std::make_heap(h.begin(), h.end(), comp);

      auto newval = ValueType(heap_element_dist(prng));
      cmp_counter<ValueType>::reset();
      method.cb(h, newval, comp);
      total_cmpcount += cmp_counter<ValueType>::cmpcount;

      if (!std::is_heap(h.begin(), h.end(), comp)) {
        std::cerr << method.label << "NOT A HEAP ANYMORE!!\n";
        return EXIT_FAILURE;
      }
    }
    std::cout << fmt::format("{:35s} {:f}\n", method.label,
                             double(total_cmpcount) / number_of_trials);
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接