对于那些对各种算法的相对性能感兴趣的人,我进行了一些调查。
重点
- 相对较小的堆(3 => 100个元素,但在下面的代码中很容易调整)
- 操作:仅更新
heap.top()
元素而已
- 优化标准=重新堆化所需的比较次数
使用c++ STL的规范方法是:
std::pop_heap(heap.begin(), heap.end(), comp);
heap.pop_back();
heap.push_back(newval);
std::push_heap(heap.begin(), heap.end(), comp);
这是一个非常简单的案例,没有“知道元素在堆中的位置”的问题。所以我们肯定可以做得更好,对吧……?
注意:对于那些感兴趣的人,我的应用程序是归并排序,其中一个大型数据文件被分成20-50个块,排序并写入磁盘。然后重新读取并合并到最终排序文件中。结果发现,选择从哪个文件合并下一个元素是瓶颈,所以我使用了std::priority_queue
,它在底层使用了一个堆。然而,这仍然是瓶颈,很难跟上磁盘,并且在比较时受限于CPU。
以下代码研究了4种实现:
- STL方式如上(使用libstdc++-11)。这是基本情况。(请注意,我认为libc++在这个领域不太复杂)。
- 递归的“向下冒泡”算法
- 迭代的“向下冒泡”算法
- 一个libstd++
__adjust_heap
后跟__push_heap
算法
我生成随机堆大小、内容和替换值,并迭代1M次。
结果
- 事实证明,Raymond Hettinger的答案中的“Case 1, 2, 3 pre-comparision”总是平均比较更多。所以我把它拿出来,直接运行每个算法。
- STL很好
- 递归和迭代的向下冒泡算法几乎相同(预期的,因为编译器在
-O3
tailall优化时会自动优化递归),即使对于这个非常专业的情况,它们始终比STL有更多的比较。因此,仅仅使用堆原语并没有给你带来任何收益。
- 我们可以通过复制未发布函数的代码,“清理”它们(去掉下划线等),并以特定的方式使用它们来解决这个受限、专业的问题,从而击败STL。收益在10-20%左右。
典型结果:
method avg_cmp_cnt
std::heap / std::priority_queue 7.568285
bubble up recursively 8.031054
bubble up iteratively 8.047352
libstc++ __adjust_heap 6.327297
测试代码:
#include "fmt/core.h"
#include <algorithm>
#include <cassert>
#include <concepts>
#include <cstddef>
#include <cstdlib>
#include <execution>
#include <iostream>
#include <random>
#include <stdexcept>
template <std::unsigned_integral T>
T log2(T x) {
T log = 0;
while (x >>= 1U) ++log;
return log;
}
template <typename T, typename Comp = std::less<>>
void print_heap(const std::vector<T>& heap, Comp comp = {}) {
std::size_t levels = log2(heap.size()) + 1;
unsigned width = 6 * (1U << (levels - 1U));
std::cout << "\n\n";
for (const auto& e: heap) std::cout << e << " ";
std::cout << "\n";
std::cout << fmt::format("is_heap = {:}\n\n", std::is_heap(heap.begin(), heap.end(), comp));
if (heap.empty()) {
std::cout << "<empty heap>\n";
return;
}
unsigned idx = 0;
bool done = false;
for (unsigned l = 0; l != levels; ++l) {
for (unsigned e = 0; e != 1U << l; ++e) {
std::cout << fmt::format("{:^{}}", heap[idx], width);
++idx;
if (idx == heap.size()) {
done = true;
break;
}
}
width /= 2;
std::cout << "\n\n";
if (done) break;
}
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_stl(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
std::pop_heap(heap.begin(), heap.end(), comp);
heap.pop_back();
heap.push_back(newval);
std::push_heap(heap.begin(), heap.end(), comp);
}
template <typename T, typename Comp = std::less<>>
void bubble_down_recursively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
const auto left = 2 * i + 1;
const auto right = 2 * i + 2;
const auto n = heap.size();
using std::swap;
if (left >= n) {
return;
} else if (right >= n) {
if (comp(heap[i], heap[left])) {
swap(heap[i], heap[left]);
bubble_down_recursively(heap, left, comp);
}
} else {
auto larger = comp(heap[right], heap[left]) ? left : right;
if (comp(heap[i], heap[larger])) {
swap(heap[i], heap[larger]);
bubble_down_recursively(heap, larger, comp);
}
}
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_recursively(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
heap[0] = newval;
bubble_down_recursively(heap, 0, comp);
}
template <typename T, typename Comp = std::less<>>
void bubble_down_iteratively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
const auto n = heap.size();
while (true) {
const std::size_t left = 2 * i + 1;
const std::size_t right = 2 * i + 2;
std::size_t largest = i;
if ((left < n) && comp(heap[largest], heap[left])) {
largest = left;
}
if ((right < n) && comp(heap[largest], heap[right])) {
largest = right;
}
if (largest == i) {
break;
}
using std::swap;
swap(heap[i], heap[largest]);
i = largest;
}
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_iteratively(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
heap[0] = newval;
bubble_down_iteratively(heap, 0, comp);
}
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void push_heap(RandomAccessIterator first, Distance holeIndex, Distance topIndex,
Tp value, Compare& comp) {
Distance parent = (holeIndex - 1) / 2;
while (holeIndex > topIndex && comp(*(first + parent), value)) {
*(first + holeIndex) = *(first + parent);
holeIndex = parent;
parent = (holeIndex - 1) / 2;
}
*(first + holeIndex) = std::move(value);
}
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void adjust_heap(RandomAccessIterator first, Distance holeIndex, Distance len, Tp value,
Compare comp) {
const Distance topIndex = holeIndex;
Distance secondChild = holeIndex;
while (secondChild < (len - 1) / 2) {
secondChild = 2 * (secondChild + 1);
if (comp(*(first + secondChild), *(first + (secondChild - 1)))) secondChild--;
*(first + holeIndex) = *(first + secondChild);
holeIndex = secondChild;
}
if ((len & 1) == 0 && secondChild == (len - 2) / 2) {
secondChild = 2 * (secondChild + 1);
*(first + holeIndex) = *(first + (secondChild - 1));
holeIndex = secondChild - 1;
}
push_heap(first, holeIndex, topIndex, value, comp);
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_adjust_heap(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
heap[0] = newval;
adjust_heap(heap.begin(), 0L, heap.end() - heap.begin(), newval, comp);
}
template <typename T>
struct cmp_counter {
static std::size_t cmpcount;
bool operator()(T a, T b) {
++cmpcount;
return a < b;
}
static void reset() { cmpcount = 0; }
};
template <typename T>
std::size_t cmp_counter<T>::cmpcount = 0;
int main() {
using ValueType = int;
struct method {
using cb_t = void (*)(std::vector<ValueType>&, ValueType, cmp_counter<ValueType>);
std::string label;
cb_t cb;
};
auto methods = std::vector<method>{
{"std::heap / std::priority_queue", &replace_top_using_stl},
{"bubble up recursively", &replace_top_using_bubble_down_recursively},
{"bubble up iteratively", &replace_top_using_bubble_down_iteratively},
{"libstc++ __adjust_heap", &replace_top_using_adjust_heap},
};
std::cout << fmt::format("{:35s} {:s}\n", "method", "avg_cmp_cnt");
for (auto& method: methods) {
auto prng = std::mt19937_64(1);
auto heap_element_dist = std::uniform_int_distribution<>(1, 1000);
auto heap_size_dist = std::uniform_int_distribution<std::size_t>(3, 100);
const std::size_t number_of_trials = 1'000'000;
std::size_t total_cmpcount = 0;
cmp_counter<ValueType> comp;
for (unsigned i = 0; i != number_of_trials; ++i) {
std::vector<int> h(heap_size_dist(prng));
std::generate(h.begin(), h.end(), [&] { return ValueType(heap_element_dist(prng)); });
std::make_heap(h.begin(), h.end(), comp);
auto newval = ValueType(heap_element_dist(prng));
cmp_counter<ValueType>::reset();
method.cb(h, newval, comp);
total_cmpcount += cmp_counter<ValueType>::cmpcount;
if (!std::is_heap(h.begin(), h.end(), comp)) {
std::cerr << method.label << "NOT A HEAP ANYMORE!!\n";
return EXIT_FAILURE;
}
}
std::cout << fmt::format("{:35s} {:f}\n", method.label,
double(total_cmpcount) / number_of_trials);
}
}