Trouble Implementing a Sliding Window in Rx
Asked Answered
L

4

13

I created a SlidingWindow operator for reactive extensions because I want to easily monitor things like rolling averages, etc. As a simple example, I want to subscribe to hear mouse events, but each time there's an event I want to receive the last three (rather than waiting for every third event to receive the last three). That's why the Window overloads I found don't seem to give me what I need out of the box.

This is what I came up with. I fear that it might not be the most performant solution, given its frequent List operations:

public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length)
{
    var seed = new List<T>();

    Func<List<T>, T, List<T>> accumulator = (list, arg2) =>
    {
        list.Add(arg2);

        if (list.Count > length)
            list.RemoveRange(0, (list.Count - length));

        return list;
    };

    return seq.Scan(seed, accumulator)
                .Where(list => list.Count == length);
}

It can be called this way:

var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable();

However, to my great surprise, instead of receiving the expected results

1,2,3
2,3,4
3,4,5

I receive the results

2,3,4
3,4,5
3,4,5

Any insights would be much appreciated!

Lethbridge answered 6/3, 2013 at 19:43 Comment(0)
B
5

Try this instead - I'd have to sit and have a think about it's relative performance, but it's at least likely as good, and way easier to read:

public static IObservable<IList<T>> SlidingWindow<T>(
       this IObservable<T> src, 
       int windowSize)
{
    var feed = src.Publish().RefCount();    
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
    return Observable.Zip(
       Enumerable.Range(0, windowSize)
           .Select(skip => feed.Skip(skip))
           .ToArray());
}

Test rig:

var source = Observable.Range(0, 10);
var query = source.SlidingWindow(3);
using(query.Subscribe(Console.WriteLine))
{               
    Console.ReadLine();
}

Output:

ListOf(0,1,2)
ListOf(1,2,3)
ListOf(2,3,4)
ListOf(3,4,5)
ListOf(4,5,6)
...

EDIT: As an aside, I find myself compulsively .Publish().RefCount()ing ever since being burned once by not doing it...I don't think it's strictly required here, tho.

EDIT for yzorg:

If you augment the method like so, you'll see the runtime behavior more clearly:

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> src, 
    int windowSize)
{
    var feed = src.Publish().RefCount();    
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
    return Observable.Zip(
    Enumerable.Range(0, windowSize)
        .Select(skip => 
        {
            Console.WriteLine("Skipping {0} els", skip);
            return feed.Skip(skip);
        })
        .ToArray());
}
Bourgogne answered 6/3, 2013 at 20:20 Comment(4)
@Lethbridge No problem - in fact, thanks for "making" me write that out, as I've used it myself a couple of times since answering this. ;)Bourgogne
I don't think this is good. The .Publish(), .Range(0,x) and .Skip() -- when these are combined it looks like bad performance, specifically O n^2, because the Skip is going to iterate the entire stream over and over. For example, you'll need to iterate 30,000 integers to get (10000, 10001, 10002). So you're really not keeping a sliding buffer of the source stream in memory, you'd have to keep the entire source stream (since the beginning of time) in memory, which is what I thought we were avoiding.Gosport
Also, this is intended for "reasonable" sized windows...I wouldn't use this for say a 10k window.Bourgogne
James World's or Luke's should be the accepted answer.Ilana
T
17

Using your original test, with an argument of 3 for count, this gives the desired results:

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> source, int count)
{
    return source.Buffer(count, 1)
                 .Where(list => list.Count == count);
}

Testing like this:

var source = Observable.Range(1, 5);
var query = source.SlidingWindow(3);
using (query.Subscribe(i => Console.WriteLine(string.Join(",", i))))
{

}

Output:

1,2,3
2,3,4
3,4,5
Topmost answered 17/5, 2013 at 15:7 Comment(3)
I know it is a quite old answer, but is .Where(list => list.Count == count) needed? I tried without it, just creating a Buffer(count, 1) and it seems working as well.Shirline
It will fail with the last count - 1 events in the sequence since these will be output with less than count items - the OP specifically asked for count items in the window. Try it with the test case in my answer to see what happens. i.e. You'll get a further final two events containing "4,5" and just "5".Topmost
you are right. In my previous test I used an ISubject instead of an IObservable: looking at the documentation, ISubject extends IObservable, so maybe in that case there's a different policy, therefore the last "4, 5" and "5" are not returned even though Where is missing. I'm a newbie at reactive, I still have to learnShirline
P
8

Just source.Window(count, 1) - or source.Buffer(count, 1) It be a window/buffer of "count" items, sliding by one.

Pindaric answered 5/3, 2014 at 0:32 Comment(0)
R
6

The sliding window implementations here was not adequate for my idea of a sliding window. The closest was using Buffer(N, 1) but is a problem since it waits for the first N items before it emits the first result then slides beyond the end of the sequence. I want up to N items emitted at a time.

I ended up with this implementation:

public static IObservable<IList<T>> SlidingWindow<T>(this IObservable<T> obs, int windowSize) =>
    Observable.Create<IList<T>>(observer =>
    {
        var buffer = new CircularBuffer<T>(windowSize);
        return obs.Subscribe(
            value =>
            {
                buffer.Add(value);
                observer.OnNext(buffer.ToList());
            },
            ex => observer.OnError(ex),
            () => observer.OnCompleted()
        );
    });

I was initially using a queue for the buffer but wanted to use something a bit more lightweight.

public class CircularBuffer<T> : IReadOnlyList<T>
{
    private readonly T[] buffer;
    private int offset;
    private int count;
    public CircularBuffer(int bufferSize) => this.buffer = new T[bufferSize];
    public int Capacity => buffer.Length;
    public int Count => count;
    public T this[int index] => index < 0 || index >= count
        ? throw new ArgumentOutOfRangeException(nameof(index))
        : buffer[(offset + index) % buffer.Length];
    public void Add(T value)
    {
        buffer[(offset + count) % buffer.Length] = value;
        if (count < buffer.Length) count++;
        else offset = (offset + 1) % buffer.Length;
    }
    public IEnumerator<T> GetEnumerator()
    {
        for (var i = 0; i < count; i++)
            yield return buffer[(offset + i) % buffer.Length];
    }
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

It would yield sequences for Observable.Range(0, 10).SlidingWindow(3):

 0,1,2,3,4,5,6,7,8,9
[0]
[0,1]
[0,1,2]
  [1,2,3]
    [2,3,4]
      [3,4,5]
        [4,5,6]
          [5,6,7]
            [6,7,8]
              [7,8,9]
Roadbed answered 24/1, 2020 at 22:29 Comment(1)
My particular use case, restricting the log output of a task in linqpad to output the up to N items to prevent reaching the limit of the output window. logs.SlidingWindow(1000).DumpLatest()Roadbed
B
5

Try this instead - I'd have to sit and have a think about it's relative performance, but it's at least likely as good, and way easier to read:

public static IObservable<IList<T>> SlidingWindow<T>(
       this IObservable<T> src, 
       int windowSize)
{
    var feed = src.Publish().RefCount();    
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
    return Observable.Zip(
       Enumerable.Range(0, windowSize)
           .Select(skip => feed.Skip(skip))
           .ToArray());
}

Test rig:

var source = Observable.Range(0, 10);
var query = source.SlidingWindow(3);
using(query.Subscribe(Console.WriteLine))
{               
    Console.ReadLine();
}

Output:

ListOf(0,1,2)
ListOf(1,2,3)
ListOf(2,3,4)
ListOf(3,4,5)
ListOf(4,5,6)
...

EDIT: As an aside, I find myself compulsively .Publish().RefCount()ing ever since being burned once by not doing it...I don't think it's strictly required here, tho.

EDIT for yzorg:

If you augment the method like so, you'll see the runtime behavior more clearly:

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> src, 
    int windowSize)
{
    var feed = src.Publish().RefCount();    
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
    return Observable.Zip(
    Enumerable.Range(0, windowSize)
        .Select(skip => 
        {
            Console.WriteLine("Skipping {0} els", skip);
            return feed.Skip(skip);
        })
        .ToArray());
}
Bourgogne answered 6/3, 2013 at 20:20 Comment(4)
@Lethbridge No problem - in fact, thanks for "making" me write that out, as I've used it myself a couple of times since answering this. ;)Bourgogne
I don't think this is good. The .Publish(), .Range(0,x) and .Skip() -- when these are combined it looks like bad performance, specifically O n^2, because the Skip is going to iterate the entire stream over and over. For example, you'll need to iterate 30,000 integers to get (10000, 10001, 10002). So you're really not keeping a sliding buffer of the source stream in memory, you'd have to keep the entire source stream (since the beginning of time) in memory, which is what I thought we were avoiding.Gosport
Also, this is intended for "reasonable" sized windows...I wouldn't use this for say a 10k window.Bourgogne
James World's or Luke's should be the accepted answer.Ilana

© 2022 - 2024 — McMap. All rights reserved.