The scenario is as follows: a device communicating over is considered connected if it makes a callback to the server within a short period of time. I want to create a class that encapsulates the functionality of keeping track of this status. On call to the device, the timeout should be reset. On callback, the connection is confirmed, and the status should be set to true, if the callback times out, it should be set to false. But the next call should be able to reset the timeout again indifferent to the current status.
I was thinking to achieve this with RX using swith
and timeout
. But I don't know why it stops working.
public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();
public bool IsConnected { get; private set; }
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds)))
.Switch()
.Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}
public void ConfirmConnected()
{
connected.OnNext(true);
}
public void SetPending()
{
pending.OnNext(true);
}
}
This is the "test case":
var c = new ConnectionStatus(default(CancellationToken));
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected();
c.IsConnected.Dump(); // FALSE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected();
c.IsConnected.Dump(); // FALSE, NOT OK!
I assume that the timeout of the inner observable is also stopping the outer observable. As the outer =>
lambda is not called anymore. What is the correct way?
Thank you
.FirstOrDefaultAsync()
afterSelect
. Without it, the timeout is triggered even after theconnected
mitted the value. I have found another solution meanwhile:pending.Select(_ => connected.Buffer(TimeSpan.FromSeconds(timeoutSeconds), 1).FirstOrDefaultAsync()) .Switch().Subscribe(l => IsConnected = l.Count > 0, token);
– Postcard