How to bind a non-disposable object with each subscription of a cold observable?
Asked Answered
D

1

1

I am probably searching for a custom Observable.Using method, that is not restricted to disposable resources. What I have is a cold IObservable that maintains some internal state, for example a Random instance. This instance should be bound not with the IObservable itself, but with each of its subscriptions. Each subscriber should use a different instance of this resource. Take a look for example to the GetRandomNumbers method below:

static IObservable<int> GetRandomNumbers()
{
    var random = new Random(0);
    return Observable
        .Interval(TimeSpan.FromMilliseconds(100))
        .Select(x => random.Next(1, 10))
        .Take(10);
}

This method generates 10 random numbers. The RNG is a Random instance initialized with a constant seed, so it should produce always the same 10 numbers. But alas it doesn't:

var stream = GetRandomNumbers();
Console.WriteLine($"Results A: {String.Join(", ", await stream.ToArray())}");
Console.WriteLine($"Results B: {String.Join(", ", await stream.ToArray())}");

Output:

Results A: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
Results B: 3, 5, 6, 5, 9, 1, 8, 9, 7, 3

Each subscriber of the stream observable gets a different set of numbers! What happens is that the same Random instance is used by all subscribers. This is not only undesirable, but it also creates the risk of corrupting the internal state of the object, since the Random class is not thread-safe.

My attempt to solve this problem was to use the Using operator, that has a Func<TResource> resourceFactory parameter:

static IObservable<int> GetRandomNumbers()
{
    return Observable.Using(() => new Random(0), random =>
        Observable
            .Interval(TimeSpan.FromMilliseconds(100))
            .Select(x => random.Next(1, 10))
            .Take(10)
    );
}

This would be a perfect solution if the Random was disposable (I tested it with a disposable class and worked as expected), but it's not, and so the code doesn't compile:

The type 'System.Random' cannot be used as type parameter 'TResource' in the generic type or method 'Observable.Using<TResult, TResource>(Func, Func<TResource, IObservable>)'. There is no implicit reference conversion from 'System.Random' to 'System.IDisposable'.

Could you suggest a solution to this problem?

Dorty answered 15/11, 2020 at 10:28 Comment(0)
T
2

Observable.Defer is your friend if you want per-subscriber state.

Try this:

static IObservable<int> GetRandomNumbers() =>
    Observable
        .Defer(() =>
        {
            var random = new Random(0);
            return Observable
                .Interval(TimeSpan.FromMilliseconds(100))
                .Select(x => random.Next(1, 10))
                .Take(10);
        });

My results:

Results A: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
Results B: 7, 8, 7, 6, 2, 6, 9, 4, 9, 3
Tessietessier answered 16/11, 2020 at 23:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.