Find running median from a stream of integers
Asked Answered
M

10

255

Possible Duplicate:
Rolling median algorithm in C

Given that integers are read from a data stream. Find median of elements read so far in efficient way.

Solution I have read: We can use a max heap on left side to represent elements that are less than the effective median, and a min heap on right side to represent elements that are greater than the effective median.

After processing an incoming element, the number of elements in heaps differ at most by 1 element. When both heaps contain the same number of elements, we find the average of heap's root data as effective median. When the heaps are not balanced, we select the effective median from the root of heap containing more elements.

But how would we construct a max heap and min heap i.e. how would we know the effective median here? I think that we would insert 1 element in max-heap and then the next 1 element in min-heap, and so on for all the elements. Correct me If I am wrong here.

Munroe answered 18/5, 2012 at 17:56 Comment(6)
Clever algorithm, using heaps. From the title I couldn't immediately think of a solution.Omarr
vizier's solution looks good to me, except that I was assuming (though you did not state) that this stream could be arbitrarily long, so you couldn't keep everything in memory. Is that the case?Izettaizhevsk
@RunningWild For arbitrarily long streams, you could get the median of the last N elements by using Fibonacci heaps (so you get log(N) deletes) and storing pointers to inserted elements in order (in e.g. a deque), then removing the oldest element at each step once the heaps are full (maybe also moving things from one heap to the other). You could get somewhat better than N by storing the numbers of repeated elements (if there are lots of repeats), but in general, I think you have to make some kind of distributional assumptions if you want the median of the whole stream.Leisha
You can start with both heaps empty. First int goes in one heap; second goes either in the other, or you move the first item to the other heap and then insert. This generalizes to "don't allow one heap to go bigger than the other +1" and no special casing is needed (the "root value" of an empty heap can be defined as 0)Glanti
I JUST got this question on a MSFT interview. Thank you for postingNadenenader
Reopened because the proposed duplicate is asking specifically for an efficient implementation, while this is more about the general approach. Also, top-voted answer here has well over ten times the score of the top-voted answer in the duplicate, which means, if anything, the other post should be the one that should be closed, or the posts should be merged.Sansbury
A
412

There are a number of different solutions for finding running median from streamed data, I will briefly talk about them at the very end of the answer.

The question is about the details of the a specific solution (max heap/min heap solution), and how heap based solution works is explained below:

For the first two elements add smaller one to the maxHeap on the left, and bigger one to the minHeap on the right. Then process stream data one by one,

Step 1: Add next item to one of the heaps

   if next item is smaller than maxHeap root add it to maxHeap,
   else add it to minHeap

Step 2: Balance the heaps (after this step heaps will be either balanced or
   one of them will contain 1 more item)

   if number of elements in one of the heaps is greater than the other by
   more than 1, remove the root element from the one containing more elements and
   add to the other one

Then at any given time you can calculate median like this:

   If the heaps contain equal amount of elements;
     median = (root of maxHeap + root of minHeap)/2
   Else
     median = root of the heap with more elements

Now I will talk about the problem in general as promised in the beginning of the answer. Finding running median from a stream of data is a tough problem, and finding an exact solution with memory constraints efficiently is probably impossible for the general case. On the other hand, if the data has some characteristics we can exploit, we can develop efficient specialized solutions. For example, if we know that the data is an integral type, then we can use counting sort, which can give you a constant memory constant time algorithm. Heap based solution is a more general solution because it can be used for other data types (doubles) as well. And finally, if the exact median is not required and an approximation is enough, you can just try to estimate a probability density function for the data and estimate median using that.

Advocate answered 18/5, 2012 at 18:15 Comment(8)
These heaps grow without bound (i.e. a 100 element window sliding over 10 million elements would require the 10 million elements to all be stored in memory). See below for another answer using indexable skiplists that only requires the most recently seen 100 elements be kept in memory.Williamsen
You can have a bounded memory solution using heaps as well, as explained in one of the comments to the question itself.Advocate
You can find an implementation of the heap-based solution in c here.Smacker
@Smacker Do you know where I can get the Java implementation of this heap-based solution?Eleventh
Wow this helped me not only is solving this specific problem but also helped me learn heaps here is my basic implementation in python : github.com/PythonAlgo/DataStructShivaree
You can find a C++ implementation here code.geeksforgeeks.org/8eO055Concordia
@HakanSerce Can you please explain why we did what we did? I mean I can see this works, but I am not able to understand it intuitively.Until
I got this question in an interview as well. I actually offered a solution based on counting sort. However, the interviewer said it still used too much memory. Hence, I believe he is looking for something with O(1) in terms of memory usage. Not sure how it can be possible. @RaymondHettinger 's solution is O(1) but it is find the median of the last x numbers. The interviewer is looking for a median of the complete set of numbers. Not sure how it is possible to do so in O(1).Submariner
E
68

If the variance of the input is statistically distributed (e.g. normal, log-normal, etc.) then reservoir sampling is a reasonable way of estimating percentiles/medians from an arbitrarily long stream of numbers.

int n = 0;  // Running count of elements observed so far  
#define SIZE 10000
int reservoir[SIZE];  

while(streamHasData())
{
  int x = readNumberFromStream();

  if (n < SIZE)
  {
       reservoir[n++] = x;
  }         
  else 
  {
      int p = random(++n); // Choose a random number 0 >= p < n
      if (p < SIZE)
      {
           reservoir[p] = x;
      }
  }
}

"reservoir" is then a running, uniform (fair), sample of all input - regardless of size. Finding the median (or any percentile) is then a straight-forward matter of sorting the reservoir and polling the interesting point.

Since the reservoir is fixed size, the sort can be considered to be effectively O(1) - and this method runs with both constant time and memory consumption.

Epizoon answered 21/5, 2012 at 23:5 Comment(4)
out of curiosity, why do you need variance?Phrygian
Stream might return less than SIZE elements letting reservoir half empty. This should be considered when computing median.Housley
Is there is a way to make this faster by calculating the difference instead of the median? Is the removed and added sample and the previous median enough information for that?Stagy
To add, "how big is the reservoir?" is the key challenge of this approachPolemist
B
61

If you can't hold all the items in memory at once, this problem becomes much harder. The heap solution requires you to hold all the elements in memory at once. This is not possible in most real world applications of this problem.

Instead, as you see numbers, keep track of the count of the number of times you see each integer. Assuming 4 byte integers, that's 2^32 buckets, or at most 2^33 integers (key and count for each int), which is 2^35 bytes or 32GB. It will likely be much less than this because you don't need to store the key or count for those entries that are 0 (ie. like a defaultdict in python). This takes constant time to insert each new integer.

Then at any point, to find the median, just use the counts to determine which integer is the middle element. This takes constant time (albeit a large constant, but constant nonetheless).

Bald answered 21/5, 2012 at 21:19 Comment(6)
If almost all of the numbers are seen once, than a sparse list will take even more memory. And it seems rather likely that if you have so many numbers they don't fit in number that most of the numbers will appear once. Dispite that, this is a clever solution for massive counts of numbers.Omarr
For a sparse list, I agree, this is worse in terms of memory. Though if the integers are randomly distributed, you'll start to get duplicates a lot sooner than intuition implies. See mathworld.wolfram.com/BirthdayProblem.html. So I'm pretty sure this will become effective as soon as you have even a few GBs of data.Bald
@AndrewC can you pls explain how it will take constant time to find the median. If I have seen n different kind of integers then in the worst case last element may be the median. This makes median finding O(n) activity.Labaw
@Labaw Isn't n the total number of elements which is >>> 2^35 in this case?Sherrillsherrington
@Labaw You're right that it's still linear in the number of different integers you've seen, as VishAmdi said, the assumption I'm making for this solution is that n is the number of numbers you've seen, which is much bigger than 2^33. If you aren't seeing that many numbers, the maxheap solution is definitely better.Bald
@AndrewC The birthday problem doesn't apply much here -- while duplicates will be nearly guaranteed you'll still see very few of them on average for a uniform distribution.Godsey
C
35

The most efficient way to calculate a percentile of a stream that I have found is the P² algorithm: Raj Jain, Imrich Chlamtac: The P² Algorithm for Dynamic Calculation of Quantiiles and Histograms Without Storing Observations. Commun. ACM 28(10): 1076-1085 (1985)

The algorithm is straight forward to implement and works extremely well. It is an estimate, however, so keep that in mind. From the abstract:

A heuristic algorithm is proposed for dynamic calculation qf the median and other quantiles. The estimates are produced dynamically as the observations are generated. The observations are not stored; therefore, the algorithm has a very small and fixed storage requirement regardless of the number of observations. This makes it ideal for implementing in a quantile chip that can be used in industrial controllers and recorders. The algorithm is further extended to histogram plotting. The accuracy of the algorithm is analyzed.

Please refer to Apache Commons Math implementation of PSquarePercentile

Companion answered 21/5, 2012 at 23:14 Comment(6)
Count-Min Sketch is better than P^2 in that it also gives error bound while the latter does not.Chios
Also consider "Space-Efficient Online Computation of Quantile Summaries" by Greenwald and Khanna, which also gives error bounds and has good memory requirements.Pape
Also, for a probabilistic approach, see this blog post: research.neustar.biz/2013/09/16/… and the paper that it refers to is here: arxiv.org/pdf/1407.1121v1.pdf This is called "Frugal Streaming"Pape
The Frugal Streaming site went down, here’s an archive.org link: web.archive.org/web/20190430013331/http://research.neustar.biz/…Braxton
@Chios the count-min sketch algorithm's maximum error grows with the total number of seen elements and is inverse-proportional to the width of a temporary table, while the lower probability bound of that error depends on the table height. Also, the retrieval of the median requires a random access scan over the table involving computing hash values of somehow discretized values between min and max observed values. In contrast, P2 requires only a tiny amount of memory and retrieving the median just involves accessing a single variable.Goidelic
Please refer to Apache Commons Math implementation of PSquarePercentile at commons.apache.org/proper/commons-math/javadocs/api-4.0-beta1/…Sweater
W
32

If we want to find the median of the n most recently seen elements, this problem has an exact solution that only needs the n most recently seen elements to be kept in memory. It is fast and scales well.

An indexable skiplist supports O(ln n) insertion, removal, and indexed search of arbitrary elements while maintaining sorted order. When coupled with a FIFO queue that tracks the n-th oldest entry, the solution is simple:

class RunningMedian:
    'Fast running median with O(lg n) updates where n is the window size'

    def __init__(self, n, iterable):
        self.it = iter(iterable)
        self.queue = deque(islice(self.it, n))
        self.skiplist = IndexableSkiplist(n)
        for elem in self.queue:
            self.skiplist.insert(elem)

    def __iter__(self):
        queue = self.queue
        skiplist = self.skiplist
        midpoint = len(queue) // 2
        yield skiplist[midpoint]
        for newelem in self.it:
            oldelem = queue.popleft()
            skiplist.remove(oldelem)
            queue.append(newelem)
            skiplist.insert(newelem)
            yield skiplist[midpoint]

Here are links to complete working code (an easy-to-understand class version and an optimized generator version with the indexable skiplist code inlined):

Williamsen answered 22/5, 2012 at 5:36 Comment(3)
If I'm understanding it correctly though, this only gives you a median of the last N elements seen, not all the elements up to that point. This does seem like a really slick solution for that operation though.Bald
Right. The answer sounds as if it was possible to find the median of all elements by just keeping the last n elements in memory - that's impossible in general. The algorithm just finds the median of the last n elements.Bigford
The term "running median" is typically used to refer to the median of a subset of data. The OP is used a common term in a non-standard way.Mistress
R
20

An intuitive way to think about this is that if you had a full balanced binary search tree, then the root would be the median element, since there there would be the same number of smaller and greater elements. Now, if the tree isn't full this won't be quite the case since there will be elements missing from the last level.

So what we can do instead is have the median, and two balanced binary trees, one for elements less than the median, and one for elements greater than the median. The two trees must be kept at the same size.

When we get a new integer from the data stream, we compare it to the median. If it's greater than the median, we add it to the right tree. If the two tree sizes differ more than 1, we remove the min element of the right tree, make it the new median, and put the old median in the left tree. Similarly for smaller.

Reste answered 22/5, 2012 at 18:59 Comment(2)
How are you going to do that? "we remove the min element of the right tree"Eleventh
I meant binary search trees, so the min element is all the way left from the root.Reste
F
9

Efficient is a word that depends on context. The solution to this problem depends on the amount of queries performed relative to the amount of insertions. Suppose you are inserting N numbers and K times towards the end you were interested in the median. The heap based algorithm's complexity would be O(N log N + K).

Consider the following alternative. Plunk the numbers in an array, and for each query, run the linear selection algorithm (using the quicksort pivot, say). Now you have an algorithm with running time O(K N).

Now if K is sufficiently small (infrequent queries), the latter algorithm is actually more efficient and vice versa.

Foundation answered 21/5, 2012 at 20:50 Comment(2)
In the heap example, lookup is constant time, so I think it should be O(N log N + K), but your point still holds.Bald
Yes, good point, will edit this out. You're right N log N is still the leading term.Foundation
M
0

Here is my simple but efficient algorithm (in C++) for calculating running median from a stream of integers:

#include<algorithm>
#include<fstream>
#include<vector>
#include<list>

using namespace std;

void runningMedian(std::ifstream& ifs, std::ofstream& ofs, const unsigned bufSize) {
    if (bufSize < 1)
        throw exception("Wrong buffer size.");
    bool evenSize = bufSize % 2 == 0 ? true : false;
    list<int> q;
    vector<int> nums;
    int n;
    unsigned count = 0;
    while (ifs.good()) {
        ifs >> n;
        q.push_back(n);
        auto ub = std::upper_bound(nums.begin(), nums.end(), n);
        nums.insert(ub, n);
        count++;
        if (nums.size() >= bufSize) {
            auto it = std::find(nums.begin(), nums.end(), q.front());
            nums.erase(it);
            q.pop_front();
            if (evenSize)
                ofs << count << ": " << (static_cast<double>(nums[nums.size() / 2 - 1] +
                static_cast<double>(nums[nums.size() / 2]))) / 2.0 << '\n';
            else
                ofs << count << ": " << static_cast<double>(nums[nums.size() / 2]);
        }
    }
}

The bufferSize specifies the size of the numbers sequence, on which the running median must be calculated. When reading numbers from the input stream ifs the vector of the size bufferSize is maintained in sorted order. The median is calculated by taking the middle of the sorted vector, if bufferSize is odd, or the sum of the two middle elements divided by 2, when bufferSize is even. Additinally, I maintain a list of last bufferSize elements read from input. When a new element is added, I put it in the right place in sorted vector and remove from the vector the element added bufferSize steps before (the value of the element retained in the front of the list). In the same time I remove the old element from the list: every new element is placed on the back of the list, every old element is removed from the front. After reaching the bufferSize, both the list and the vector stop to grow, and every insertion of a new element is compensated be deletion of an old element, placed in the list bufferSize steps before. Note, I do not care, whether I remove from the vector exactly the element, placed bufferSize steps before, or just an element that has the same value. For the value of median it does not matter. All calculated median values are output in the output stream.

Miun answered 23/8, 2020 at 23:58 Comment(0)
S
0

I can confirm the answer by @schmil-the-cat is correct.

Here is an implementation in JS. I'm not expert with algorithms but thought it might be useful for other people.


class Heap {
  constructor(isMin) {
    this.heap = [];
    this.isMin = isMin;
  }

  heapify() {
    if (this.heap.length === 1) {
      return;
    }

    let currentIndex = this.heap.length - 1; 

    while (true) {
      if (currentIndex === 0) {
        break;
      }

      const parentIndex = Math.floor((currentIndex - 1) / 2);
      const parentValue = this.heap[parentIndex];
      const currentValue = this.heap[currentIndex];

      if (
        (this.isMin && parentValue < currentValue) ||
        (!this.isMin && parentValue > currentValue)
      ) {
        break;
      }

      this.heap[parentIndex] = currentValue;
      this.heap[currentIndex] = parentValue;

      currentIndex = parentIndex;
    }
  }

  insert(val) {
    this.heap.push(val);

    this.heapify();
  }

  pop() {
    const val = this.heap.shift();
    this.heapify();
    return val;
  }

  top() {
    return this.heap[0];
  }

  length() {
    return this.heap.length;
  }
}

function findMedian(arr) {
  const topHeap = new Heap(true);
  const bottomHeap = new Heap(false);

  const output = [];

  if (arr.length === 1) {
    return arr[0];
  }

  topHeap.insert(Math.max(arr[0], arr[1]));
  bottomHeap.insert(Math.min(arr[0], arr[1]));

  for (let i = 0; i < arr.length; i++) {
    const currentVal = arr[i];

    if (i === 0) {
      output.push(currentVal);
      continue;
    }

    if (i > 1) {
      if (currentVal < bottomHeap.top()) {
        bottomHeap.insert(currentVal);
      } else {
        topHeap.insert(currentVal);
      }
    }

    if (bottomHeap.length() - topHeap.length() > 1) {
      const bottomVal = bottomHeap.pop();
      topHeap.insert(bottomVal);
    }

    if (topHeap.length() - bottomHeap.length() > 1) {
      const topVal = topHeap.pop();
      bottomHeap.insert(topVal);
    }

    if (bottomHeap.length() === topHeap.length()) {
      output.push(Math.floor((bottomHeap.top() + topHeap.top()) / 2));
      continue;
    }

    if (bottomHeap.length() > topHeap.length()) {
      output.push(bottomHeap.top());
    } else {
      output.push(topHeap.top());
    }
  }

  return output;
}

Squishy answered 18/9, 2022 at 15:36 Comment(0)
U
-3

Can't you do this with just one heap? Update: no. See the comment.

Invariant: After reading 2*n inputs, the min-heap holds the n largest of them.

Loop: Read 2 inputs. Add them both to the heap, and remove the heap's min. This reestablishes the invariant.

So when 2n inputs have been read, the heap's min is the nth largest. There'll need to be a little extra complication to average the two elements around the median position and to handle queries after an odd number of inputs.

Uncinate answered 21/5, 2012 at 21:12 Comment(2)
Doesn't work: you can drop things that later turn out to be near the top. For instance, try your algorithm with the numbers 1 to 100, but in reverse order: 100, 99, ..., 1.Kinzer
Thanks, zellyn. Silly of me to convince myself the invariant was reestablished.Uncinate

© 2022 - 2024 — McMap. All rights reserved.