Reactive Throttle returning all items added within the TimeSpan
Asked Answered
M

3

11

Given an IObservable<T> is there a way to use Throttle behaviour (reset a timer when an item is added, but have it return a collection of all the items added within that time?

Buffer provides a similar functionality it that it chunks the data up into IList<T> on every time span or count. But I need that time to reset each time an item is added.

I've seen a similar question here, Does reactive extensions support rolling buffers?, but the answers don't seem ideal and it's a little old so I wondered if the release version of Rx-Main now supports this functionality out the box.

Morrie answered 13/1, 2012 at 11:24 Comment(3)
It sounds like my BufferWithInactivity answer in https://mcmap.net/q/492262/-does-reactive-extensions-support-rolling-buffers is what you're asking for. Can you clarify your question please?Agar
@Agar It is, it's exactly the functionality I'm after. I referenced that question in my question :) But I don't like that answer, the answerer has explicitly stated it's work in progress.Morrie
Not sure what you're asking. If the timer gets reset every time an item is "added" (propagated?) how will there be anything to buffer in the first place?Kohinoor
P
14

As I answered in the other post, yes you can! Using the Throttle and Window methods of Observable:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}
Ponce answered 20/3, 2012 at 17:28 Comment(1)
Nice answer! But shouldn't one publish the stream using return stream.Publish(hot =>...), to avoid subscribing twice to cold observables?Wing
M
4

I amended Colonel Panic's BufferUntilInactive operator by adding a Publish component, so that it works correctly with cold observables too:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
        published
            .Window(() => published.Throttle(dueTime, scheduler))
            .SelectMany(window => window.ToList())
    );
}

For completeness I've also added an optional IScheduler parameter, which configures the scheduler where the timer is run.

Mackle answered 27/12, 2020 at 9:26 Comment(1)
For a BufferUntilInactive variant that also has a maxCount parameter, you can look here.Mackle
B
0

Wouldn't it work with

Observable.BufferWithTimeOrCount<TSource> Method (IObservable<TSource>, TimeSpan, Int32)?

Bravery answered 13/1, 2012 at 12:19 Comment(1)
Nope, the time component will kick in immediately and buffer everything until that time elapsed, and the count will buffer n number of items. I need the buffer time to reset when an item is added (like Throttle).Morrie

© 2022 - 2024 — McMap. All rights reserved.