How to update elements within a heap? (priority queue)
Asked Answered
R

3

46

When using a min/max-heap algorithm, priorities may change. One way to handle this is to removal and insert the element to update the queue order.

For priority queues implemented using arrays, this can be a performance bottleneck that seems avoidable, especially for cases when the change to priority is small.

Even if this is not a standard operation for a priority queue, this is a custom implementation which can be modified for my needs.

Are there well known best-practice methods for updating elements in the min/max-heap?


Background Info: I'm no expert in binary-trees, I inherited some code that has a performance bottleneck re-inserting elements in a priority queue. I've made a re-insertion function for the min-heap that re-orders the new element - which gives a measurable improvement over (remove & insert), however this seems the kind of problem that others may have solved in a more elegant way.

I could link to the code if it helps but would rather not focus too much on implementation details - since this Q&A can probably be kept general.

Ronaronal answered 29/10, 2017 at 1:31 Comment(7)
Is there any particular pattern to how the priorities change? Also, how frequent are priority changes relative to other operations like insertions and deletions?Breana
Priorities change many times more than insertion or removal, the whole purpose of the min-heap (in this case), is to track the changes while popping the best. Although not sure this is relevant to the question? (even if it wasn't the bottleneck - it may be helpful to optimize for this case)Ronaronal
When the priorities change, is there any pattern to it? Like, do they always increase, always decrease, etc.?Breana
No, although they might not change by much (re-ordering might not even be needed), this seems like something that could be optimized for. Even if it was following some patterns/constraints, I think that might be a different question.Ronaronal
How many times do you end up dequeuing relative to the number of updates?Breana
Not sure how this would change the answer - but in this case - roughly 10x more updates to de-queuing.Ronaronal
I posted an answer below with a detailed investigation of various algorithmic options for an "ideally simple use case", which shows "how we can beat the c++ STL"Highstepper
B
57

Typical Solution

The usual solution is to mark an element as invalid and insert a new element, then eliminate the invalid entries as they are popped-off.

Alternative Solution

If that approach doesn't suffice, it is possible restore the min-heap invariant in O(log n) steps as long as the location of the value being changed is known.

Recall that min-heaps are built and maintained using two primitives, "siftup" and "siftdown" (though various sources have differing opinions on which is up and which is down). One of those pushes values down the tree and the other floats them upward.

Case 1: Value is increased

If the new value x1 is greater than the old value x0, then only the tree under x needs to be fixed because parent(x) <= x0 < x1. Just push x down the tree by swapping x with the smaller of its two children while x is bigger than one of its children.

Case 2: Value is decreased

If the new value x1 is less than the old value x, the tree below x needs no adjustment because x1 < x0 <= either_child(x). Instead, we just need to move upward, swapping x with its parent while x is less than its parent. Sibling nodes need not be considered because they are already greater than or equal to a parent which will potentially be replaced with a lower value.

Case 3: Value is unchanged

No work is necessary. The existing invariants are unchanged.

Working code in Python

Test 1,000,000 trials: Create a random heap. Alter a randomly selected value. Restore the heap condition. Verify that the result is a min-heap.

from heapq import _siftup, _siftdown, heapify
from random import random, randrange, choice

def is_minheap(arr):
    return all(arr[i] >= arr[(i-1)//2] for i in range(1, len(arr)))

n = 40
trials = 1_000_000
for _ in range(trials):

    # Create a random heap
    data = [random() for i in range(n)]
    heapify(data)

    # Randomly alter a heap element
    i = randrange(n)
    x0 = data[i]
    x1 = data[i] = choice(data)

    # Restore the heap
    if x1 > x0:                 # value is increased
        _siftup(data, i)
    elif x1 < x0:               # value is decreased
        _siftdown(data, 0, i)

    # Verify the results
    assert is_minheap(data), direction
Bendite answered 29/10, 2017 at 4:22 Comment(13)
Interesting & good to know, although I'm not sure I could use this in my case since elements are updated frequently on a large data-set, which could give a large and unpredictable worst-case outcome. Maybe a detail - but I'm using fixed size allocations, so the possibility to re-alloc potentially large arrays isn't so appealing.Ronaronal
Great comprehensive answer (updated since my last comment), I'm using this in production code and it's given a significant speedup.Ronaronal
I've never seen the "typical solution" used. The asymptotic complexity of both solutions is O(n) if you don't know where the item is in the heap. If you do know where the item is, then the "typical solution" is kind of silly because, as you've shown, re-adjusting the heap is trivial.Historian
@jim-mischel, right, not to mention the penalty for having unused nodes in the heap which need to be sifted through when performing any changes to the heap. Are there good examples of open-source heap API's which support re-adjusting? (I'm writing my own but am interested to see how others do this in production code).Ronaronal
I don't know if any such APIs that are public. I wrote my own in C# several years ago. It used a Dictionary to keep track of items' positions in the heap.Historian
Looks like the up and down is inverted in the Python world, right?Susquehanna
@Susquehanna I presume they are much older, look up heapsort from 1964.Oxygenate
The asymptotic runtime of the no DECREASE-KEY solution is the same: https://mcmap.net/q/15896/-why-does-dijkstra-39-s-algorithm-use-decrease-key and may be even faster in practice: https://mcmap.net/q/15896/-why-does-dijkstra-39-s-algorithm-use-decrease-keyMazman
I posted an answer below with a detailed investigation of various algorithmic options for an "ideally simple use case", which shows "how we can beat the c++ STL"Highstepper
@JimMischel How do you figure the "typical" solution is O(n)? "Marking an element as invalid" can be done in many different ways, many of which are constant time. And then inserting the extra node takes O(lgn) time.Garget
@Eknoma: as I said in my comment, the complexity is O(n) if you don't know where the item is in the heap. Marking an element as invalid is O(1). Finding an arbitrary element in the heap so that you can mark it is O(n).Historian
@JimMischel But why do you have to find the element to mark it as invalid?Garget
@Garget Typically the task is "mark the element with key X (or whatever) as invalid." If you already have a reference to the element then you don't need to find it. But in most situations I'm aware of all you have is the key and you have to find the item in the heap.Historian
R
7

Posting answer to own question since it includes links to working code.


This is in fact quite simple.

Typically a min-heap implementation has functions for ordering, see example: BubbleUp/Down.

These functions can run on the modified element, depending on the change relative to the current value. eg:

if new_value < old_value {
    heap_bubble_up(heap, node);
} else if new_value > old_value {
    heap_bubble_down(heap, node);
}

While the number of operations depends on the distribution of values, this will be equal or fewer steps then maintaining a sorted list.

In general, small changes are _much_ more efficient than a remove+insert.

See working code, and test, which implements a min-heap with insert/remove/re-prioritize, without an initial lookup (the caller stores an opaque reference).


Even re-ordering only the required elements may end up being many operations for a large heap.

If this is too inefficient, a minimum heap may not a a good fit.

A binary-tree might be better (red-black tree for example), where removal and insertion scale better.

However I'm not sure about an rb-trees ability to re-order in-place, as a min-heap can do.

Ronaronal answered 29/10, 2017 at 8:14 Comment(6)
The expensive part of changing the priority of an item in a binary heap is finding the item's location in the heap. That's typically an O(n) operation, unless you have a secondary data structure that keeps track of the index of every item. Maintaining that data structure requires that you update it every time you move an item, but it allows you to locate an item in O(1). Changing the priority after you've located the item requires at most O(log n) steps. As you've noted, the actual number of steps required depends, in general, on the magnitude of the change.Historian
This depends on the implementation, check the code linked - users of the API store an opaque reference to the node, allowing for fast removal and re-prioritization. In fact there is no search function (which would indeed be slower especially if it isn't balanced).Ronaronal
Yes, it depends on the implementation. Their heap item node contains the index value. That qualifies as a secondary data structure that keeps track of the index, and puts additional burden on the client to dereference the node to get to their actual data. But it does work well. A binary heap is always balanced, but it's not ordered for efficient search, so any implementation that doesn't maintain that index makes changing priority or removing arbitrary nodes rather expensive.Historian
Nice implementation. In my case I wanted to minimise comparisons, when updating the top element (never anything else). So using the c++ heap utilities I was doing: std::pop_heap(h.begin(), h.end(), comp); h.pop_back(); h.push_back(newval); std::push_heap(h.begin(), h.end(), comp);. I thought I could beat this on number of comparisons by updating the top element and then calling code similar to your heap_down on that top element (I tried a recursive and an iterative implementation). Result: more comparisons than the above in all cases.Highstepper
The "Magic" seems to be in libstdc++ internal implementation of __adjust_heap() ... I haven't really understood what this does exactly, but it gets the job done with VERY few comparisons.Highstepper
I posted an answer below with a detailed investigation, which shows "how we can beat the c++ STL"Highstepper
P
2

For those interested in the relative performance of the various algorithms I ran a little investigation.

Focus

  1. Relatively small heaps (3 => 100 elements, but easily adjustable in code below)
  2. Operation: Update the heap.top() element ONLY
  3. Optimisation criteria = number of comparisons to re-heapify

The canonical way to do this using the c++ STL is:

  // Note I am using a max-heap, but the comparator is customisable below
  std::pop_heap(heap.begin(), heap.end(), comp);
  heap.pop_back();
  heap.push_back(newval);
  std::push_heap(heap.begin(), heap.end(), comp);

This is a very simple case. No question of "knowing where the element is in the heap". So surely we can do better, right...?

Note: For those interested my application is a Merge sort, where a large file of data has been broken into 20-50 chunks sorted and written to disk. Then re-read and merged into the final sorted file. It turns out that picking which file to merge the next element from is the bottleneck, so I use a std::priority_queue, which uses a heap underneath. However this is still the bottleneck and it's hard to keep up with the disk ,and is CPU bound on the comparisons.

The code below studies 4 implementations:

  1. STL way as above (using libstdc++-11). This is the base case. (Note that I belive libc++ is less sophisticated in this area).
  2. A recursive "bubble down" algo
  3. An iterative "bubble down" algo
  4. A libstd++ __adjust_heap followed by __push_heap algo

I generate random heap sizes, contents and replacement value and iterate 1M times.

Findings

  1. It turns out that the "Case 1, 2, 3 pre-comparision" from Raymond Hettinger's answer always has more comparisons on average. So I took it out and just ran each algo directly.
  2. The STL is Good
  3. The recursive and iterative bubble down alogs are nearly identical (expected, because the compile on -O3 tailall optimises the recursion away, anyway) and consistently have more comparisons than the STL even for this very specialised case. So just using a heap-primitive doesn't give you any gains.
  4. We can beat the STL using... the STL , by copying out the code of the unpublished functions, "cleaning them up" (of underscores etc) and using them in a specialised way to solve this limited, specialised problem. Gains are in the order of 10-20%.

Typical results:

method                              avg_cmp_cnt
std::heap / std::priority_queue     7.568285
bubble up recursively               8.031054
bubble up iteratively               8.047352
libstc++ __adjust_heap              6.327297

Test code:

#include "fmt/core.h"
#include <algorithm>
#include <cassert>
#include <concepts>
#include <cstddef>
#include <cstdlib>
#include <execution>
#include <iostream>
#include <random>
#include <stdexcept>

template <std::unsigned_integral T>
T log2(T x) {
  T log = 0;
  while (x >>= 1U) ++log;
  return log;
}

template <typename T, typename Comp = std::less<>>
void print_heap(const std::vector<T>& heap, Comp comp = {}) {
  std::size_t levels = log2(heap.size()) + 1;
  unsigned    width  = 6 * (1U << (levels - 1U));
  std::cout << "\n\n";
  for (const auto& e: heap) std::cout << e << " ";
  std::cout << "\n";
  std::cout << fmt::format("is_heap = {:}\n\n", std::is_heap(heap.begin(), heap.end(), comp));
  if (heap.empty()) {
    std::cout << "<empty heap>\n";
    return;
  }
  unsigned idx  = 0;
  bool     done = false;
  for (unsigned l = 0; l != levels; ++l) {
    for (unsigned e = 0; e != 1U << l; ++e) {
      std::cout << fmt::format("{:^{}}", heap[idx], width);
      ++idx;
      if (idx == heap.size()) {
        done = true;
        break;
      }
    }
    width /= 2;
    std::cout << "\n\n";
    if (done) break;
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_stl(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  std::pop_heap(heap.begin(), heap.end(), comp);
  heap.pop_back();
  heap.push_back(newval);
  std::push_heap(heap.begin(), heap.end(), comp);
}

template <typename T, typename Comp = std::less<>>
// NOLINTNEXTLINE recursion is tailcall eliminated by compiler
void bubble_down_recursively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
  const auto left  = 2 * i + 1;
  const auto right = 2 * i + 2;
  const auto n     = heap.size();

  using std::swap; // enable ADL

  if (left >= n) { // no children
    return;
  } else if (right >= n) { // left exists right does not.  NOLINT else after return
    if (comp(heap[i], heap[left])) {
      swap(heap[i], heap[left]);
      bubble_down_recursively(heap, left, comp);
    }
  } else { // both children exist
    // 'larger' is only well named if comp = std::less<>{}
    auto larger = comp(heap[right], heap[left]) ? left : right;
    if (comp(heap[i], heap[larger])) {
      swap(heap[i], heap[larger]);
      bubble_down_recursively(heap, larger, comp);
    }
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_recursively(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;
  bubble_down_recursively(heap, 0, comp);
}

template <typename T, typename Comp = std::less<>>
void bubble_down_iteratively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
  const auto n = heap.size();

  while (true) {
    const std::size_t left  = 2 * i + 1;
    const std::size_t right = 2 * i + 2;

    std::size_t largest = i;

    if ((left < n) && comp(heap[largest], heap[left])) {
      largest = left;
    }
    if ((right < n) && comp(heap[largest], heap[right])) {
      largest = right;
    }

    if (largest == i) {
      break;
    }

    using std::swap; // enable ADL
    swap(heap[i], heap[largest]);
    i = largest;
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_iteratively(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;                       // stick it in anyway
  bubble_down_iteratively(heap, 0, comp); // and fix the heap
}

// borrowed from libstdc++ __push_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void push_heap(RandomAccessIterator first, Distance holeIndex, Distance topIndex,
                         Tp value, Compare& comp) {
  Distance parent = (holeIndex - 1) / 2;
  while (holeIndex > topIndex && comp(*(first + parent), value)) {
    *(first + holeIndex) = *(first + parent);
    holeIndex            = parent;
    parent               = (holeIndex - 1) / 2;
  }
  *(first + holeIndex) = std::move(value);
}

// borrowed from libstdc++ __adjust_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void adjust_heap(RandomAccessIterator first, Distance holeIndex, Distance len, Tp value,
                           Compare comp) {
  const Distance topIndex    = holeIndex;
  Distance       secondChild = holeIndex;
  while (secondChild < (len - 1) / 2) {
    secondChild = 2 * (secondChild + 1);
    if (comp(*(first + secondChild), *(first + (secondChild - 1)))) secondChild--;
    *(first + holeIndex) = *(first + secondChild);
    holeIndex            = secondChild;
  }
  if ((len & 1) == 0 && secondChild == (len - 2) / 2) {
    secondChild          = 2 * (secondChild + 1);
    *(first + holeIndex) = *(first + (secondChild - 1));
    holeIndex            = secondChild - 1;
  }
  push_heap(first, holeIndex, topIndex, value, comp);
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_adjust_heap(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;
  adjust_heap(heap.begin(), 0L, heap.end() - heap.begin(), newval, comp);
}

template <typename T>
struct cmp_counter {
  static std::size_t cmpcount; // NOLINT must be static because STL takes Comp by value
  bool               operator()(T a, T b) {
    ++cmpcount;
    return a < b; // effectively std::less<>{};
  }
  static void reset() { cmpcount = 0; }
};
template <typename T>
std::size_t cmp_counter<T>::cmpcount = 0; // NOLINT global static

int main() {

  using ValueType = int;
  
  struct method {
    using cb_t = void (*)(std::vector<ValueType>&, ValueType, cmp_counter<ValueType>);
    std::string label;
    cb_t        cb;
  };
  auto methods = std::vector<method>{
      {"std::heap / std::priority_queue", &replace_top_using_stl},
      {"bubble up recursively", &replace_top_using_bubble_down_recursively},
      {"bubble up iteratively", &replace_top_using_bubble_down_iteratively},
      {"libstc++ __adjust_heap", &replace_top_using_adjust_heap},
  };

  std::cout << fmt::format("{:35s} {:s}\n", "method", "avg_cmp_cnt");
  for (auto& method: methods) {
    auto prng              = std::mt19937_64(1); // NOLINT fixed seed for repeatability
    auto heap_element_dist = std::uniform_int_distribution<>(1, 1000);
    auto heap_size_dist    = std::uniform_int_distribution<std::size_t>(3, 100);

    const std::size_t number_of_trials = 1'000'000;

    std::size_t total_cmpcount = 0;
    cmp_counter<ValueType> comp;
    for (unsigned i = 0; i != number_of_trials; ++i) {
      std::vector<int> h(heap_size_dist(prng));
      std::generate(h.begin(), h.end(), [&] { return ValueType(heap_element_dist(prng)); });

      std::make_heap(h.begin(), h.end(), comp);

      auto newval = ValueType(heap_element_dist(prng));
      cmp_counter<ValueType>::reset();
      method.cb(h, newval, comp);
      total_cmpcount += cmp_counter<ValueType>::cmpcount;

      if (!std::is_heap(h.begin(), h.end(), comp)) {
        std::cerr << method.label << "NOT A HEAP ANYMORE!!\n";
        return EXIT_FAILURE;
      }
    }
    std::cout << fmt::format("{:35s} {:f}\n", method.label,
                             double(total_cmpcount) / number_of_trials);
  }
}
Platonic answered 8/3, 2022 at 14:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.