With Reactive Extensions (RX), is it possible to add a "Pause" command?
Asked Answered
B

3

7

I have a class which takes in a stream of events, and pushes out another stream of events.

All of the events use Reactive Extensions (RX). The incoming stream of events is pushed from an external source into an IObserver<T> using .OnNext, and the outgoing stream of events is pushed out using IObservable<T> and .Subscribe. I am using Subject<T> to manage this, behind the scenes.

I am wondering what techniques there are in RX to pause the output temporarily. This would mean that incoming events would build up in an internal queue, and when they are unpaused, the events would flow out again.

Brede answered 11/7, 2015 at 21:29 Comment(3)
Thinking that if the output is paused, could redirect events into an internal queue, and when the outout is paused, it could flush the queue out.Brede
You haven't implemented your own IObserver<T>, have you?Dissociate
No, all I've done is cast the internal Subject<T> to an IObserver<T> so the method .OnNext can be exposed.Brede
O
4

Here is my solution using Buffer and Window operators:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
    var queue = source.Buffer(pauser.Where(toPause => !toPause),
                              _ => pauser.Where(toPause => toPause))
                      .SelectMany(l => l.ToObservable());

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
                         _ => pauser.Where(toPause => !toPause))
                 .Switch()
                 .Merge(queue);
}

Window is opened at subscription and every time 'true' is received from pauser stream. It closes when pauser provides 'false' value.

Buffer does what it supposed to do, buffers values that are between 'false' and 'true' from pauser. Once Buffer receives 'true' it outputs IList of values that are instantly streamed all at once.

DotNetFiddle link: https://dotnetfiddle.net/vGU5dJ

Outdare answered 13/7, 2015 at 11:47 Comment(8)
You probably need to do a pauser.Publish(ps => { ... }) and replace pauser with ps in your code otherwise you're creating four subscriptions to pauser and depending on the source of pauser that may make the method fail.Dissociate
Yes, I just confirmed. You're creating multiple subscriptions.Dissociate
You can Publish both streams, but it's not a requirement. For example, in my current app most sources are originating from Subjects, so it would make sense to subscribe directly.Outdare
Also, why not just var ps = pauser.Publish()? Final code is so nested, it is really hard to read.Outdare
If you just do .Publish() you have a connectable observable. If you do .Publish(ps => ...) then you still end up with a normal observable. And, yes, it does make it harder to read - however, it does make it correct. It is certainly possible to break your code depending on how you create your observables - cold observables, for example, would cause grief without the .Publish(ps => ...).Dissociate
I've tested it with var source = Observable.Generate(0, x => x < 20, x => x + 1, x => x, x => TimeSpan.FromSeconds(0.2)); var pausable = Observable.Generate(0, x => x < 100, x => x + 1, x => x % 2 == 0, x => TimeSpan.FromSeconds(1.0)); source.Pausable(pausable).Subscribe(x => x.Dump(), () => "Done.".Dump());. Without the .Publish(ps => ...) it doesn't pause and doesn't return the right values. With it it does. However, both ways it doesn't signal an end.Dissociate
Yeah, I totally agree that to work in any possible situation you definitely need Publish. I guess I'm just spoiled, because most of my sources are hot without subscription side effects. As for Publish, wouldn't Publish().RefCount() also be a correct solution?Outdare
.Publish().RefCount() would work fine, but you'd have to do var source2 = source.Publish.RefCount(); and then use source2 throughout. The same with pauser. It would work, but I'm not sure how much it saves you.Dissociate
D
3

Here's a reasonably simple Rx way to do what you want. I've created an extension method called Pausable that takes a source observable and a second observable of boolean that pauses or resumes the observable.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

It can be used like this:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

Now, the only thing I couldn't quite work out what you mean by your "incoming stream of events is an IObserver<T>". Streams are IObservable<T>. Observers aren't streams. It sounds like you're not doing something right here. Can you add to your question and explain further please?

Dissociate answered 13/7, 2015 at 4:55 Comment(4)
Thanks for your answer. I've updatd my question to make the dataflow clearer.Brede
As is, this apparently doesn't let through the first values, because it goes through the false branch and wants to concat values (which is effectively Observable.Never, I think?). I've hacked it into shape by initialising values to null on the first time and checking in both branches. Not sure if there's anything more elegant.Riker
Further warning to future self. ReplaySubject is the spawn of the devil. If you don't limit it (by buffer size or time) it will hang on to everything it ever sees, in case someone else comes along to subscribe.Riker
This is a nice clean solution, but as previously noted there is a bug where initial events from the stream are lost since "values" is not a completed stream (until the first true event). To fix 1) I would add a "bool initialState = false" parameter so you can start paused or unpaused 2) after the "var values = ..." line, add the following: "if (!initialState) values.OnCompleted()"Hebbe
E
1

You can simulate pausing/unpausing with an Observable.

Once your pauseObservable emits a 'paused' value, buffer events until pauseObservable emits an 'unpaused' value.

Here's an example which uses BufferUntil implementation by Dave Sexton and Observable logic by Timothy Shields (from my own question a while back)

        //Input events, hot observable
        var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => i.ToString())
            .Publish().RefCount();

       //Simulate pausing from Keyboard, not actually relevant within this answer
        var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            k => KeyPressed += k, k => KeyPressed -= k)
            .Select(i => i.EventArgs.PressedKey)
            .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause
            .DistinctUntilChanged();

        //Let events through when not paused
        var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused})
            .Where(i => !i.paused) //not paused
            .Select(i => i.value)
            .Subscribe(Console.WriteLine);

        //When paused, buffer until not paused
        var pausedEvents = pauseObservable.Where(i => i)
            .Subscribe(_ =>
                source.BufferUntil(pauseObservable.Where(i => !i))
                    .Select(i => String.Join(Environment.NewLine, i))
                    .Subscribe(Console.WriteLine));

Room for improvement : maybe merge the two subscriptions to source (pausedEvents and notPausedEvents) as one.

Ellord answered 12/7, 2015 at 15:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.