How to merge multiple observables with order preservation and maximum concurrency?
Asked Answered
N

2

3

I have a nested observable IObservable<IObservable<T>>, and I want to flatten it to a IObservable<T>. I don't want to use the Concat operator because it delays the subscription to each inner observable until the completion of the previous observable. This is a problem because the inner observables are cold, and I want them to start emitting T values immediately after they are emitted by the outer observable. I also don't want to use the Merge operator because it messes the order of the emitted values. The marble diagram below shows the problematic (for my case) behavior of the Merge operator, as well as the Desirable merging behavior.

Stream of observables: +----1------2-----3----|
Observable-1         :      +--A-----------------B-------|
Observable-2         :             +---C---------------------D------|
Observable-3         :                   +--E--------------------F-------|
Merge (undesirable)  : +-------A-------C----E----B-----------D---F-------|
Desirable merging    : +-------A-----------------B-------C---D------EF---|

All values emitted by the Observable-1 should precede any value emitted by the Observable-2. The same should be true with the Observable-2 and Observable-3, and so on.

What I like with the Merge operator is that it allows to configure the maximum concurrent subscriptions to inner observables. I would like to preserve this functionality with the custom MergeOrdered operator I am trying to implement. Here is my under-construction method:

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return source.Merge(maximumConcurrency); // How to make it ordered?
}

And here is a usage example:

var source = Observable
    .Interval(TimeSpan.FromMilliseconds(300))
    .Take(4)
    .Select(x => Observable
        .Interval(TimeSpan.FromMilliseconds(200))
        .Select(y => $"{x + 1}-{(char)(65 + y)}")
        .Take(3));

var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

Output (undesirable):

Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C

The desirable output is:

Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C

Clarification: Regarding the ordering of the values, the values themselves are irrelevant. What matters is the order of their originated inner sequence, and their position in that sequence. All values from the first inner sequence should be emitted first (in their original order), then all the values from the second inner sequence, then all the values from the third, etc.

Nealson answered 15/11, 2020 at 4:52 Comment(7)
This is the closest "duplicate" I was able to find. It is somewhat relevant, but has specific nuances that make it both broader and narrower to my question.Nealson
Interesting, you have this that you can play with and there is source code as well, maybe that can get you on the right track: rxmarbles.comPisolite
Have you considered to create hot observables from cold ones from sources and then just use Concat?Photon
@Photon yes, I had this idea too, by using the Publish operator. But I was losing values, and also I haven't managed to combine it with the maximumConcurrency functionality.Nealson
@Pisolite does the rxmarbles.com allows to create the complex marble diagram shown in this question, of the requested MergeOrdered operator? AFAICS I can only choose one of the predefined operators, and move the diagram bullets left and right with the mouse.Nealson
@TheodorZoulias the idea behind my comment was that there is source code to look at, should have to code it yourself.Pisolite
@Pisolite hmm, I don't understand how this can help me. The people who wrote that source code were not trying to solve my problem. And my problem is very specific and quite unique. It's not something that people ask every day.Nealson
N
0

I figured out a solution to this problem, by using a combination of the Merge, Merge(1)¹ and Replay operators. The Merge operator enforces the concurrency policy, and the Merge(1) operator enforces the ordered sequential emission. To prevent the Merge from messing up the order of the emitted values, an extra wrapping of the inner sequences is introduced. Each inner sequence is projected to an IObservable<IObservable<T>> that emits immediately the inner sequence, and later completes when the inner sequence completes. This wrapping is implemented using the Observable.Create method:

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return source.Select(inner => inner.Replay(buffered => Observable
        .Create<IObservable<T>>(observer =>
    {
        observer.OnNext(buffered);
        return buffered.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
    })))
    .Merge(maximumConcurrency)
    .Merge(1);
}

The Replay operator buffers all the messages that are emitted by the inner sequences, so that they are not lost in the meantime between the subscription by the Merge, and the subscription by the Merge(1).

The funny thing is that, because of the wrapping, an intermediate IObservable<IObservable<IObservable<T>>> sequence is created. This scary thing is then unwrapped two times, first by the Merge and secondly by the Merge(1) operators.

This is not a perfectly efficient solution, because there is no reason for the inner sequence currently subscribed by the Merge(1) to be buffered. Optimizing this inefficiency in not trivial though, so I'll leave it as is. In scenarios where each subsequence contains a small number of elements, the impact of this flaw should be negligible. Attempting to fix it could even result at doing more harm than good in these scenarios.

¹ Ideally I would like to use the Concat instead of the equivalent but less efficient Merge(1) operator. Unfortunately the Concat operator behaves weirdly in the current version of the Rx library (5.0.0). I even encountered a deadlocking behavior when using the Concat in a fairly complex query, that was solved by switching to the Merge(1) operator.


Note: The original implementation of this answer, featuring a SemaphoreSlim for controlling the concurrency instead of the Merge operator, can be found in the 1st revision. The Merge-based implementation should be better, because it doesn't involve fire-and-forget task continuations, and the subscription to the inner sequences happens synchronously, instead of being offloaded to the ThreadPool.

Nealson answered 17/11, 2020 at 18:5 Comment(0)
M
0

There's no way for this observable to know if the last value of any of the inner observables will be the first value that should be produced.

As an example, you could have this:

Stream of observables: +--1---2---3--|
Observable-1         :    +------------B--------A-|
Observable-2         :        +--C--------D-|
Observable-3         :            +-E--------F-|
Desirable merging    : +------------------------ABCDEF|

In this case, I'd do this:

IObservable<char> query =
    sources
        .ToObservable()
        .Merge()
        .ToArray()
        .SelectMany(xs => xs.OrderBy(x => x));
Megasporophyll answered 15/11, 2020 at 6:47 Comment(1)
Hi Enigmativity, thanks for the answer! I may have not explained the requirements well enough. The values themselves are not important. The primary objective is to preserve the order of the inner sequences. In your example the desirable outcome is +---------------B--------ACDEF|, because the first sequence emitted B-A, the second C-D, and the third E-F. I edited the question to make the requirements clearer.Nealson
N
0

I figured out a solution to this problem, by using a combination of the Merge, Merge(1)¹ and Replay operators. The Merge operator enforces the concurrency policy, and the Merge(1) operator enforces the ordered sequential emission. To prevent the Merge from messing up the order of the emitted values, an extra wrapping of the inner sequences is introduced. Each inner sequence is projected to an IObservable<IObservable<T>> that emits immediately the inner sequence, and later completes when the inner sequence completes. This wrapping is implemented using the Observable.Create method:

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return source.Select(inner => inner.Replay(buffered => Observable
        .Create<IObservable<T>>(observer =>
    {
        observer.OnNext(buffered);
        return buffered.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
    })))
    .Merge(maximumConcurrency)
    .Merge(1);
}

The Replay operator buffers all the messages that are emitted by the inner sequences, so that they are not lost in the meantime between the subscription by the Merge, and the subscription by the Merge(1).

The funny thing is that, because of the wrapping, an intermediate IObservable<IObservable<IObservable<T>>> sequence is created. This scary thing is then unwrapped two times, first by the Merge and secondly by the Merge(1) operators.

This is not a perfectly efficient solution, because there is no reason for the inner sequence currently subscribed by the Merge(1) to be buffered. Optimizing this inefficiency in not trivial though, so I'll leave it as is. In scenarios where each subsequence contains a small number of elements, the impact of this flaw should be negligible. Attempting to fix it could even result at doing more harm than good in these scenarios.

¹ Ideally I would like to use the Concat instead of the equivalent but less efficient Merge(1) operator. Unfortunately the Concat operator behaves weirdly in the current version of the Rx library (5.0.0). I even encountered a deadlocking behavior when using the Concat in a fairly complex query, that was solved by switching to the Merge(1) operator.


Note: The original implementation of this answer, featuring a SemaphoreSlim for controlling the concurrency instead of the Merge operator, can be found in the 1st revision. The Merge-based implementation should be better, because it doesn't involve fire-and-forget task continuations, and the subscription to the inner sequences happens synchronously, instead of being offloaded to the ThreadPool.

Nealson answered 17/11, 2020 at 18:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.