How can I use Reactive Extensions to throttle Events using a max window size?
Asked Answered
P

2

10

Scenario:

I am building a UI application that gets notifcations from a backend service every few milliseconds. Once I get a new notification i want to update the UI as soon as possible.

As I can get lots of notifications within a short amount of time, and as I always only care about the latest event, I use the Throttle() method of the Reactive Extensions framework. This allows me to ignore notification events that are immediately followed by a new notification and so my UI stays responsive.

Problem:

Say I throttle the event stream of notification events to 50ms and the backend sends a notification every 10ms, the Thottle() method will never return an event as it keeps resetting its Sliding Window again and again. Here i need some additional behaviour to specify something like a timeout, so that i can retrieve atleast one event per second or so in case of such a high throughput of events. How can i do this with Reactive Extensions?

Penniepenniless answered 17/11, 2013 at 18:25 Comment(0)
J
14

As James stated, Observable.Sample will give you the latest value yielded. However, it will do so on a timer, and not in accordance to when the first event in the throttle occurred. More importantly, however, is that if your sample time is high (say ten seconds), and your event fires right after a sample is taken, you won't get that new event for almost ten seconds.

If you need something a little tighter, you'll need to implement your own function. I've taken the liberty of doing so. This code could definitely use some clean up, but I believe it does what you've asked for.

public static class ObservableEx
{
    public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime)
    {
        return source.ThrottleMax(dueTime, maxTime, Scheduler.Default);
    }

    public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime, IScheduler scheduler)
    {
        return Observable.Create<T>(o =>
        {
            var hasValue = false;
            T value = default(T);

            var maxTimeDisposable = new SerialDisposable();
            var dueTimeDisposable = new SerialDisposable();

            Action action = () =>
            {
                if (hasValue)
                {
                    maxTimeDisposable.Disposable = Disposable.Empty;
                    dueTimeDisposable.Disposable = Disposable.Empty;
                    o.OnNext(value);
                    hasValue = false;
                }
            };

            return source.Subscribe(
                x =>
                {
                    if (!hasValue)
                    {
                        maxTimeDisposable.Disposable = scheduler.Schedule(maxTime, action);
                    }

                    hasValue = true;
                    value = x;
                    dueTimeDisposable.Disposable = scheduler.Schedule(dueTime, action);
                },
                o.OnError,
                o.OnCompleted
            );
        });
    }
}

And a few tests...

[TestClass]
public class ThrottleMaxTests : ReactiveTest
{
    [TestMethod]
    public void CanThrottle()
    {

        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(200, 1)
            );
    }

    [TestMethod]
    public void CanThrottleWithMaximumInterval()
    {

        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(175, 2),
            OnNext(250, 3),
            OnNext(325, 4),
            OnNext(400, 5)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(350, 4),
            OnNext(500, 5)
            );
    }

    [TestMethod]
    public void CanThrottleWithoutMaximumIntervalInterferance()
    {
        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(325, 2)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(200, 1),
            OnNext(425, 2)
            );
    }
}
Jessalyn answered 18/11, 2013 at 3:27 Comment(3)
Interesting. I certainly can see maximum throttle duration being useful.Hallway
Code for OnNext and TestScheduler are missing. I realise this is a very old answer, but do you still have it?Surfboat
I did a quick search and found this, but it's been a while since I've worked with C#, and things change quickly. Does VS intellisense/resharper or VS Code C# plugin not suggest an import? github.com/dotnet/reactive/blob/master/Rx.NET/Source/src/… example usage: github.com/dotnet/reactive/blob/…Jessalyn
H
3

Don't use Observable.Throttle, use Observable.Sample like this, where the TimeSpan gives the desired minimum interval between updates:

source.Sample(TimeSpan.FromMilliseconds(50))
Hallway answered 18/11, 2013 at 0:1 Comment(6)
If you do want to see how to put a limit on throttling btw, see my answer here: #19996705Hallway
You might also add .DistinctUntilChanged(), as that will prevent the same event from being "sampled" over and over again, which would better match Throttle, since it won't yield the same event multiple times. source.Sample(TimeSpan.FromMilliseconds(50)).DistinctUntilChanged()Jessalyn
However, this solution won't work if you need events almost immediately when no new events occur, but must not wait longer than a specified time period after an event has occurred. This might warrant special interest...Jessalyn
I think given the scenario of updating a UI every second, it's probably OK ;) - .Throttle adds some delay too of course.Hallway
@Chris Sample doesn't reissue the same event if nothing has happened over an interval btw, so DistinctUntilChanged is only needed if you want to eliminate consecutive value-equal events. Also note the important distinction in placing it before or after Sample.Hallway
Ahh, I may have been working off knowledge from an old version of RxJS, or just been completely oblivious. Thanks!Jessalyn

© 2022 - 2024 — McMap. All rights reserved.