Rx Buffer without empty calls to subscriber
Asked Answered
D

2

11

In my WPF application using .Net 4.6 I have an event which fires new data points at a high rate (several hundred per second), but not all the time. This data is displayed in a chart.

I would like to update the chart every 50 ms and not after each new data point.
To achieve that I though of using Buffer(TimeSpan.FromMilliseconds(50)) from Rx, which in theory works fine. BUT my subscriber is also called every 50 ms if no new data points are created which is not exactly what I want.

I created a little sample application to test that out:

using System;
using System.Reactive.Linq;

namespace RxTester
{
    public class Program
    {
        private static event EventHandler TheEvent;

        static void Main(string[] args)
        {
            var observable = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e), h => TheEvent += h, h => TheEvent -= h);
            var subscriber = observable.Buffer(TimeSpan.FromMilliseconds(1000))
                .Subscribe(e => Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {e.Count} elements received..."));

            var random = new Random();
            var timer = new System.Timers.Timer(2000)
                {
                    AutoReset = true,
                    Enabled = true
                };
            timer.Elapsed += (s, e) =>
                {
                    var amount = random.Next(1, 10);
                    for (int i = 0; i < amount; ++i)
                        TheEvent?.Invoke(null, null);
                };

            Console.ReadLine();

            timer.Enabled = false;
            subscriber.Dispose();
        }
    }
}

You need to add the "Rx-Linq" NuGet package for it to run or use the following Fiddle: https://dotnetfiddle.net/TV5tD4

There you see several "0 elements received" which is what I would like to avoid. I know I could simple check for e.Count == 0, but as I use multiple of such buffers this does not seem optimal to me.

Is there a way to only create new buffered blocks of element if elements are available?
I am also open for other approaches to solve my problem of batching events on a time basis - I already looked into TPL Dataflows BatchBlock, but that seems to only support count based block sizes.

Dichloride answered 24/2, 2016 at 15:27 Comment(1)
D
5

Once again we can use the powerful GroupByUntil method to create this extension

public static IObservable<IList<TSource>> BufferWhenAvailable<TSource>
                                          (this IObservable<TSource> source, 
                                           TimeSpan threshold)
{
    return source.Publish( sp => 
                    sp.GroupByUntil(_ => true, _ => Observable.Timer(threshold))
                      .SelectMany(i => i.ToList()));

}
Dynamism answered 24/2, 2016 at 19:22 Comment(5)
Thanks for this - seems to work fine. BUT return source.GroupByUntil(_ => true, _ => Observable.Timer(threshold)).SelectMany(g => g.ToList()); seems to also work just fine - what is the benefit of the Observable.Create around it?Dichloride
@ChrFin in this case there is no benefit. I'll edit the answer :)Dynamism
...and if you think about it Observable.FromEvent is always hot. So Publish could also be omitted.Dynamism
Nice. Is the Publish part required?Kwei
@LeeCampbell in this case there is no shared subscription. So no it isn't necessary.Dynamism
K
4

The standard way of doing this is simply

.Buffer(period)
.Where(buffer=>buffer.Any())

So effectively doing what you want to avoid (count==0). However, this check is very cheap and I would imagine if far cheaper than the other cost involved i.e. Scheduling. The only concern might be the amount allocations that are happening (every 50ms creating a List<T>) and then the impending GC Gen0 pressure that may build.

Kwei answered 25/2, 2016 at 1:4 Comment(5)
Thanks for your input, but The only concern might be the amount allocations that are happening is why I want to avoid this, as I possible have multiple of these buffers at once...Dichloride
I hear ya. Sounds like it would a good one to add to the tool box. If I come up with a soln to this will get back to you github.com/LeeCampbell/RxCookbook/issues/27Kwei
@LeeCampbell if I understand correctly: even if you prevent Buffer(TimeSpan) from yielding empty buffers, the solution is still different. The buffers are "started" based on the timer, not when a new value arrives.Dynamism
Yes. Buffer will use constant cadence windows. If you want to start buffers based on when values arrive, then you probably want to look into the GroupJoin operator introtorx.com/Content/v1.0.10621.0/… and produce something custom yourself.Kwei
Upvote on supertopii's answer (though I would like to see the supporting unit tests)Kwei

© 2022 - 2024 — McMap. All rights reserved.