For those interested in the relative performance of the various algorithms I ran a little investigation.
Focus
- Relatively small heaps (3 => 100 elements, but easily adjustable in code below)
- Operation: Update the
heap.top()
element ONLY
- Optimisation criteria = number of comparisons to re-heapify
The canonical way to do this using the c++ STL is:
// Note I am using a max-heap, but the comparator is customisable below
std::pop_heap(heap.begin(), heap.end(), comp);
heap.pop_back();
heap.push_back(newval);
std::push_heap(heap.begin(), heap.end(), comp);
This is a very simple case. No question of "knowing where the element is in the heap". So surely we can do better, right...?
Note: For those interested my application is a Merge sort, where a large file of data has been broken into 20-50 chunks sorted and written to disk. Then re-read and merged into the final sorted file. It turns out that picking which file to merge the next element from is the bottleneck, so I use a std::priority_queue
, which uses a heap underneath. However this is still the bottleneck and it's hard to keep up with the disk ,and is CPU bound on the comparisons.
The code below studies 4 implementations:
- STL way as above (using libstdc++-11). This is the base case. (Note that I belive libc++ is less sophisticated in this area).
- A recursive "bubble down" algo
- An iterative "bubble down" algo
- A libstd++
__adjust_heap
followed by __push_heap
algo
I generate random heap sizes, contents and replacement value and iterate 1M times.
Findings
- It turns out that the "Case 1, 2, 3 pre-comparision" from Raymond Hettinger's answer always has more comparisons on average. So I took it out and just ran each algo directly.
- The STL is Good
- The recursive and iterative bubble down alogs are nearly identical (expected, because the compile on
-O3
tailall optimises the recursion away, anyway) and consistently have more comparisons than the STL even for this very specialised case. So just using a heap-primitive doesn't give you any gains.
- We can beat the STL using... the STL , by copying out the code of the unpublished functions, "cleaning them up" (of underscores etc) and using them in a specialised way to solve this limited, specialised problem. Gains are in the order of 10-20%.
Typical results:
method avg_cmp_cnt
std::heap / std::priority_queue 7.568285
bubble up recursively 8.031054
bubble up iteratively 8.047352
libstc++ __adjust_heap 6.327297
Test code:
#include "fmt/core.h"
#include <algorithm>
#include <cassert>
#include <concepts>
#include <cstddef>
#include <cstdlib>
#include <execution>
#include <iostream>
#include <random>
#include <stdexcept>
template <std::unsigned_integral T>
T log2(T x) {
T log = 0;
while (x >>= 1U) ++log;
return log;
}
template <typename T, typename Comp = std::less<>>
void print_heap(const std::vector<T>& heap, Comp comp = {}) {
std::size_t levels = log2(heap.size()) + 1;
unsigned width = 6 * (1U << (levels - 1U));
std::cout << "\n\n";
for (const auto& e: heap) std::cout << e << " ";
std::cout << "\n";
std::cout << fmt::format("is_heap = {:}\n\n", std::is_heap(heap.begin(), heap.end(), comp));
if (heap.empty()) {
std::cout << "<empty heap>\n";
return;
}
unsigned idx = 0;
bool done = false;
for (unsigned l = 0; l != levels; ++l) {
for (unsigned e = 0; e != 1U << l; ++e) {
std::cout << fmt::format("{:^{}}", heap[idx], width);
++idx;
if (idx == heap.size()) {
done = true;
break;
}
}
width /= 2;
std::cout << "\n\n";
if (done) break;
}
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_stl(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
std::pop_heap(heap.begin(), heap.end(), comp);
heap.pop_back();
heap.push_back(newval);
std::push_heap(heap.begin(), heap.end(), comp);
}
template <typename T, typename Comp = std::less<>>
// NOLINTNEXTLINE recursion is tailcall eliminated by compiler
void bubble_down_recursively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
const auto left = 2 * i + 1;
const auto right = 2 * i + 2;
const auto n = heap.size();
using std::swap; // enable ADL
if (left >= n) { // no children
return;
} else if (right >= n) { // left exists right does not. NOLINT else after return
if (comp(heap[i], heap[left])) {
swap(heap[i], heap[left]);
bubble_down_recursively(heap, left, comp);
}
} else { // both children exist
// 'larger' is only well named if comp = std::less<>{}
auto larger = comp(heap[right], heap[left]) ? left : right;
if (comp(heap[i], heap[larger])) {
swap(heap[i], heap[larger]);
bubble_down_recursively(heap, larger, comp);
}
}
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_recursively(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
heap[0] = newval;
bubble_down_recursively(heap, 0, comp);
}
template <typename T, typename Comp = std::less<>>
void bubble_down_iteratively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
const auto n = heap.size();
while (true) {
const std::size_t left = 2 * i + 1;
const std::size_t right = 2 * i + 2;
std::size_t largest = i;
if ((left < n) && comp(heap[largest], heap[left])) {
largest = left;
}
if ((right < n) && comp(heap[largest], heap[right])) {
largest = right;
}
if (largest == i) {
break;
}
using std::swap; // enable ADL
swap(heap[i], heap[largest]);
i = largest;
}
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_iteratively(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
heap[0] = newval; // stick it in anyway
bubble_down_iteratively(heap, 0, comp); // and fix the heap
}
// borrowed from libstdc++ __push_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void push_heap(RandomAccessIterator first, Distance holeIndex, Distance topIndex,
Tp value, Compare& comp) {
Distance parent = (holeIndex - 1) / 2;
while (holeIndex > topIndex && comp(*(first + parent), value)) {
*(first + holeIndex) = *(first + parent);
holeIndex = parent;
parent = (holeIndex - 1) / 2;
}
*(first + holeIndex) = std::move(value);
}
// borrowed from libstdc++ __adjust_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void adjust_heap(RandomAccessIterator first, Distance holeIndex, Distance len, Tp value,
Compare comp) {
const Distance topIndex = holeIndex;
Distance secondChild = holeIndex;
while (secondChild < (len - 1) / 2) {
secondChild = 2 * (secondChild + 1);
if (comp(*(first + secondChild), *(first + (secondChild - 1)))) secondChild--;
*(first + holeIndex) = *(first + secondChild);
holeIndex = secondChild;
}
if ((len & 1) == 0 && secondChild == (len - 2) / 2) {
secondChild = 2 * (secondChild + 1);
*(first + holeIndex) = *(first + (secondChild - 1));
holeIndex = secondChild - 1;
}
push_heap(first, holeIndex, topIndex, value, comp);
}
template <typename T, typename Comp = std::less<>>
void replace_top_using_adjust_heap(std::vector<T>& heap, T newval, Comp comp = {}) {
if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");
assert(std::is_heap(heap.begin(), heap.end(), comp));
heap[0] = newval;
adjust_heap(heap.begin(), 0L, heap.end() - heap.begin(), newval, comp);
}
template <typename T>
struct cmp_counter {
static std::size_t cmpcount; // NOLINT must be static because STL takes Comp by value
bool operator()(T a, T b) {
++cmpcount;
return a < b; // effectively std::less<>{};
}
static void reset() { cmpcount = 0; }
};
template <typename T>
std::size_t cmp_counter<T>::cmpcount = 0; // NOLINT global static
int main() {
using ValueType = int;
struct method {
using cb_t = void (*)(std::vector<ValueType>&, ValueType, cmp_counter<ValueType>);
std::string label;
cb_t cb;
};
auto methods = std::vector<method>{
{"std::heap / std::priority_queue", &replace_top_using_stl},
{"bubble up recursively", &replace_top_using_bubble_down_recursively},
{"bubble up iteratively", &replace_top_using_bubble_down_iteratively},
{"libstc++ __adjust_heap", &replace_top_using_adjust_heap},
};
std::cout << fmt::format("{:35s} {:s}\n", "method", "avg_cmp_cnt");
for (auto& method: methods) {
auto prng = std::mt19937_64(1); // NOLINT fixed seed for repeatability
auto heap_element_dist = std::uniform_int_distribution<>(1, 1000);
auto heap_size_dist = std::uniform_int_distribution<std::size_t>(3, 100);
const std::size_t number_of_trials = 1'000'000;
std::size_t total_cmpcount = 0;
cmp_counter<ValueType> comp;
for (unsigned i = 0; i != number_of_trials; ++i) {
std::vector<int> h(heap_size_dist(prng));
std::generate(h.begin(), h.end(), [&] { return ValueType(heap_element_dist(prng)); });
std::make_heap(h.begin(), h.end(), comp);
auto newval = ValueType(heap_element_dist(prng));
cmp_counter<ValueType>::reset();
method.cb(h, newval, comp);
total_cmpcount += cmp_counter<ValueType>::cmpcount;
if (!std::is_heap(h.begin(), h.end(), comp)) {
std::cerr << method.label << "NOT A HEAP ANYMORE!!\n";
return EXIT_FAILURE;
}
}
std::cout << fmt::format("{:35s} {:f}\n", method.label,
double(total_cmpcount) / number_of_trials);
}
}