Take the last item pushed to an Observable (Sequence)
Asked Answered
S

3

5

I have an IObservable<Item> inside a class and I want to expose a read-only property that provides the last item pushed to the observable at a given time. So it will provide a single value of Item.

If no value has been pushed, then it will have to return a default value.

How can I do this without having to subscribe to the observable and having a "backing field"?

Suppressive answered 4/2, 2017 at 16:8 Comment(2)
What do you mean by the "last item pushed to the observable"? The observable's current value?Service
Yes! If the observable sequence is, {1, 4, 6,... 400 }, reading the property will return 400Suppressive
S
1

Here is another way to define the Value property, in the spirit of Asti's solution.

private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;

public SomeClass() // Constructor
{
    _source = /* Initialize the source observable (hot) */

    _lastValue = _source
        .Catch(Observable.Never<Item>())
        .Concat(Observable.Never<Item>())
        .Publish(default)
        .AutoConnect(0)
        .FirstAsync();
}

public Item Value => _lastValue.Wait();

The Publish operator that accepts an initialValue parameter...

Returns a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. This operator is a specialization of Multicast using a BehaviorSubject<T>.

The BehaviorSubject<T> is a specialized ISubject<T> that...

Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.

The Catch and Concat operators have been added in order to preserve the last value, even in case that the source sequence completes normally or exceptionally.

Personally I would be hesitant to use this solution, since a volatile field updated in the Do operator would accomplish the same thing more naturally. I am posting it mainly as a demonstration of the Rx capabilities.

Splenic answered 4/12, 2020 at 14:41 Comment(0)
C
6

Just to supplement @Asti's answer a bit here, and perhaps help you with your frustration:

An observable isn't a physical 'thing', it's more a logical concept. Rx is often compared to LINQ, and it's a fair comparison much of the time. It breaks down though when you start talking data structures: LINQ's enumerables are similar enough to Lists for learning purposes.

However, on the Rx side, there's simply no good equivalent to List. An observable is a transient data structure, all operators deal with this transient state. If you're looking for a permanent state, you're leaving Rx.

Having said that, converting an observable to some sort of state is a common problem, and there are some packages that may help you: ReactiveUI is perhaps the most known. ReactiveProperty is another. Both of these packages are flawed, but may help you.

If you're simply looking for an easier way to get a backing field, without boiler-plating out a backing field, this will work:

public static class ReactivePropertyExtensions
{
    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
    {
        return new ReactiveProperty<T>(source);
    }

    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue)
    {
        return new ReactiveProperty<T>(source, defaultValue);
    }
}

public class ReactiveProperty<T> : IDisposable
{
    private IObservable<T> Source { get; }
    private IDisposable Subscription { get; }
    public T Value { get; private set; }

    public ReactiveProperty(IObservable<T> source)
        : this(source, default(T)) { }

    public ReactiveProperty(IObservable<T> source, T defaultValue)
    {
        Value = defaultValue;
        Source = source;
        Subscription = source.Subscribe(t => Value = t);
    }

    public void Dispose()
    {
        Subscription.Dispose();
    }
}

Example use:

var ticker = Observable.Interval(TimeSpan.FromSeconds(1))
    .Publish().RefCount();

var latestTickerValue = ticker.ToReactiveProperty();
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine(latestTickerValue.Value);
Compline answered 7/2, 2017 at 14:46 Comment(4)
After some time, I'm re-reading your answer. It's great, but there's a statement you say that I don't understand: Talking about ReactiveUI and ReactiveProperty => "Both of these packages are flawed, but may help you". What do you mean? why do you think they're flawed? Thanks in advance!Suppressive
This is all opinion, so not sure if this is the right forum: ReactiveProperty has a huge number of dependencies because he crams support for all platforms in one package. So if you just want one function or class, you're introducing a good deal of bloat to your project.Compline
ReactiveUI seems to be in a constant state of flux with the occasional breaking changes and not enough solid documentation. The boilerplate required for Reactive Properties is also awkward. That one's the real pity, if you asked me: It's so close to being fantastic, but always seems two steps away.Compline
Talking about boilerplate in ReactiveUI, I highly recommend you this Fody extension: github.com/kswoll/ReactiveUI.FodyTyndale
L
3

Assuming a hot observable.

For observable = source.Replay(1); observable.Connect();

Provide the value with:

public int Value => observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();

This will return a default value in case no values have been pushed.

You want a transition from Reactive to state, so a backing field isn't a terrible option. You mentioned that you don't want to subscribe, but to observe anything: something, somewhere has to subscribe.

Lakia answered 4/2, 2017 at 16:54 Comment(5)
OK, but that is async. My property shouldn't be async. Also, LastAsync it returns the last element when the sequence has finished, but I don't want to return the instant last value pushed.Suppressive
@Suppressive Do you want to have the last value pushed to the observable while the observable is running without having a backing field? It has nothing to do with the observable completing?Lakia
Yes, exactly that!Suppressive
OK, I must say I don't like the Wait at all, but I think there's no alternative. I finally chose to have a backing field that is set in a subscription: source.Subscribe(v => property = v)Suppressive
Oh, and it'll never have actually to wait because the Return method or the replay method activates on subscribe.Lakia
S
1

Here is another way to define the Value property, in the spirit of Asti's solution.

private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;

public SomeClass() // Constructor
{
    _source = /* Initialize the source observable (hot) */

    _lastValue = _source
        .Catch(Observable.Never<Item>())
        .Concat(Observable.Never<Item>())
        .Publish(default)
        .AutoConnect(0)
        .FirstAsync();
}

public Item Value => _lastValue.Wait();

The Publish operator that accepts an initialValue parameter...

Returns a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. This operator is a specialization of Multicast using a BehaviorSubject<T>.

The BehaviorSubject<T> is a specialized ISubject<T> that...

Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.

The Catch and Concat operators have been added in order to preserve the last value, even in case that the source sequence completes normally or exceptionally.

Personally I would be hesitant to use this solution, since a volatile field updated in the Do operator would accomplish the same thing more naturally. I am posting it mainly as a demonstration of the Rx capabilities.

Splenic answered 4/12, 2020 at 14:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.