How to achieve sequence of timeouts with RX?
Asked Answered
P

2

5

The scenario is as follows: a device communicating over is considered connected if it makes a callback to the server within a short period of time. I want to create a class that encapsulates the functionality of keeping track of this status. On call to the device, the timeout should be reset. On callback, the connection is confirmed, and the status should be set to true, if the callback times out, it should be set to false. But the next call should be able to reset the timeout again indifferent to the current status.

I was thinking to achieve this with RX using swith and timeout. But I don't know why it stops working.

public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();

public bool IsConnected { get; private set; }

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds))) 
        .Switch()
        .Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}

public void ConfirmConnected()
{
    connected.OnNext(true);
}

public void SetPending()
{
    pending.OnNext(true);
}
}

This is the "test case":

var c = new ConnectionStatus(default(CancellationToken));

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();   
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, NOT OK!

I assume that the timeout of the inner observable is also stopping the outer observable. As the outer => lambda is not called anymore. What is the correct way?

Thank you

Postcard answered 15/11, 2017 at 12:59 Comment(0)
H
4

The problem is that Timeout essentially causes an Exception blowing up the Rx subscriptions. After the timeout is triggered (as you have it coded), no other notifications will be sent. Rx grammar is that you can have * OnNext messages followed by either one OnCompleted or one OnError. After the OnError from the Timeout is sent, you'll see no more messages.

You need to have the Timeout message delivered via OnNext messages instead of an OnError message. In your old code, you turned any OnError into a false, and any OnNext into a true. Instead you need to embed the proper new IsConnected value into OnNext messages. Here's how to do that:

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(_ => connected
            .Timeout(TimeSpan.FromSeconds(timeoutSeconds))
            .Materialize()
            .Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException) 
                ? Notification.CreateOnNext(false)
                : n)
            .Dematerialize()
            .Take(1)
        )
        .Switch()
        .Subscribe(b => IsConnected = b, token);
}
Heroic answered 15/11, 2017 at 14:9 Comment(4)
Thank you. I have tested your suggestion, and I have found that it needs a .FirstOrDefaultAsync() after Select. Without it, the timeout is triggered even after the connected mitted the value. I have found another solution meanwhile: pending.Select(_ => connected.Buffer(TimeSpan.FromSeconds(timeoutSeconds), 1).FirstOrDefaultAsync()) .Switch().Subscribe(l => IsConnected = l.Count > 0, token);Postcard
In what case did you find FirstOrDefaultAsync to be required?Heroic
c.SetPending(); await Task.Delay(TimeSpan.FromSeconds(5)); c.ConfirmConnected(); c.IsConnected.Dump() /*TRUE, OK*/; await Task.Delay(TimeSpan.FromSeconds(20)); c.IsConnected.Dump(); The later should be still TRUE, but it is FALSE without FirstOrDefaultAsync.Postcard
Amended answer to fix that bug, and another possible one (exceptions wouldn't be propagated). Your alternate is fine.Heroic
C
6

Here's an alternative way to produce the stream of IsConnected values without using .TimeOut:

public class ConnectionStatus
{
    private Subject<Unit> pending = new Subject<Unit>();
    private Subject<Unit> connected = new Subject<Unit>();

    public bool IsConnected { get; private set; }

    public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
    {
        pending
            .Select(outer =>
                Observable.Amb(
                    connected.Select(_ => true),
                    Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false)))
            .Switch()
            .Subscribe(isConnected => IsConnected = isConnected, token);
    }

    public void ConfirmConnected()
    {
        connected.OnNext(Unit.Default);
    }

    public void SetPending()
    {
        pending.OnNext(Unit.Default);
    }
}

The Observable.Amb operator simply takes a value from whichever observable produces a value first - it's preferable to coding with exceptions.

Chicalote answered 16/11, 2017 at 6:8 Comment(0)
H
4

The problem is that Timeout essentially causes an Exception blowing up the Rx subscriptions. After the timeout is triggered (as you have it coded), no other notifications will be sent. Rx grammar is that you can have * OnNext messages followed by either one OnCompleted or one OnError. After the OnError from the Timeout is sent, you'll see no more messages.

You need to have the Timeout message delivered via OnNext messages instead of an OnError message. In your old code, you turned any OnError into a false, and any OnNext into a true. Instead you need to embed the proper new IsConnected value into OnNext messages. Here's how to do that:

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(_ => connected
            .Timeout(TimeSpan.FromSeconds(timeoutSeconds))
            .Materialize()
            .Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException) 
                ? Notification.CreateOnNext(false)
                : n)
            .Dematerialize()
            .Take(1)
        )
        .Switch()
        .Subscribe(b => IsConnected = b, token);
}
Heroic answered 15/11, 2017 at 14:9 Comment(4)
Thank you. I have tested your suggestion, and I have found that it needs a .FirstOrDefaultAsync() after Select. Without it, the timeout is triggered even after the connected mitted the value. I have found another solution meanwhile: pending.Select(_ => connected.Buffer(TimeSpan.FromSeconds(timeoutSeconds), 1).FirstOrDefaultAsync()) .Switch().Subscribe(l => IsConnected = l.Count > 0, token);Postcard
In what case did you find FirstOrDefaultAsync to be required?Heroic
c.SetPending(); await Task.Delay(TimeSpan.FromSeconds(5)); c.ConfirmConnected(); c.IsConnected.Dump() /*TRUE, OK*/; await Task.Delay(TimeSpan.FromSeconds(20)); c.IsConnected.Dump(); The later should be still TRUE, but it is FALSE without FirstOrDefaultAsync.Postcard
Amended answer to fix that bug, and another possible one (exceptions wouldn't be propagated). Your alternate is fine.Heroic

© 2022 - 2024 — McMap. All rights reserved.