I have a nested observable IObservable<IObservable<T>>
, and I want to flatten it to a IObservable<T>
. I don't want to use the Concat
operator because it delays the subscription to each inner observable until the completion of the previous observable. This is a problem because the inner observables are cold, and I want them to start emitting T
values immediately after they are emitted by the outer observable. I also don't want to use the Merge
operator because it messes the order of the emitted values. The marble diagram below shows the problematic (for my case) behavior of the Merge
operator, as well as the Desirable merging behavior.
Stream of observables: +----1------2-----3----|
Observable-1 : +--A-----------------B-------|
Observable-2 : +---C---------------------D------|
Observable-3 : +--E--------------------F-------|
Merge (undesirable) : +-------A-------C----E----B-----------D---F-------|
Desirable merging : +-------A-----------------B-------C---D------EF---|
All values emitted by the Observable-1 should precede any value emitted by the Observable-2. The same should be true with the Observable-2 and Observable-3, and so on.
What I like with the Merge
operator is that it allows to configure the maximum concurrent subscriptions to inner observables. I would like to preserve this functionality with the custom MergeOrdered
operator I am trying to implement. Here is my under-construction method:
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return source.Merge(maximumConcurrency); // How to make it ordered?
}
And here is a usage example:
var source = Observable
.Interval(TimeSpan.FromMilliseconds(300))
.Take(4)
.Select(x => Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(y => $"{x + 1}-{(char)(65 + y)}")
.Take(3));
var results = await source.MergeOrdered(2).ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");
Output (undesirable):
Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C
The desirable output is:
Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C
Clarification: Regarding the ordering of the values, the values themselves are irrelevant. What matters is the order of their originated inner sequence, and their position in that sequence. All values from the first inner sequence should be emitted first (in their original order), then all the values from the second inner sequence, then all the values from the third, etc.
Concat
? – PhotonPublish
operator. But I was losing values, and also I haven't managed to combine it with themaximumConcurrency
functionality. – NealsonMergeOrdered
operator? AFAICS I can only choose one of the predefined operators, and move the diagram bullets left and right with the mouse. – Nealson