RxJava / RxJs: How to merge two source observables but complete as soon as one of them completes
Asked Answered
A

4

9

I have two source observables. I would like to merge the two source observables, but the merged observable sould complete as soon as one of the source observables completes.

Desired behavior:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4--x

In case of an error on one of the sources, the error should propagate to the merged observable:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged"  ---1---2----3--4--ex

The "merge" operator only completes the merged stream when both sources have completed:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4-----------------------------x

How can I achieve my desired behavior?

Abbott answered 19/12, 2017 at 14:12 Comment(0)
H
12

You need to work with the metadata, information about each observable. To do this, use the materialize() operator on each stream and the use dematerialize() on the merged stream to actually emit the data.

Observable.merge( observableA.materialize(),
                  observableB.materialize() )
  .takeWhile( notification -> notification.hasValue() )
  .dematerialize()
  .subscribe( ... );

This will merge the two observables until either one of them completes or emits an error.

Hansom answered 19/12, 2017 at 14:39 Comment(5)
Nice! A few syntax corrections: -> should be => and hasValue is a property not a function.Interlingua
Sorry, I was writing the response in Java. Translation to other languages will be required.Hansom
Ah, I was thinking in js but I guess the question is tagged for both :)Interlingua
Won't the takeWhile convert failures to completions. Seems like the dematerialize is all that would actually be needed, it will emit complete or failure when either of the sources do and that should terminate the merge.Quotient
The comment from @MichaelKrussel is correct. TakeWhile will hide failures. On the other hand, simply omitting it and Dematerializing will produce the expected result and will emit either error or complete when any observable errors out or copletes.Eniwetok
I
1

I sure hope someone else answers with more elegant method but this works.

I think you would have to use one of the take operators. You could complete all sources when one source completes like so:

const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
Rx.Observable.merge(a.takeUntil(b.last()), b.takeUntil(a.last()))
  .subscribe(
    x => { console.log('next', x); },
    null,
    () => { console.log('complete'); }
  );
  
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

Or a less readable but more scaleable version:

function merge(...obs) {
  return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}

const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);

merge(a, b)
  .subscribe(
    x => { console.log('next', x); },
    null,
    () => { console.log('complete'); }
  );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

Here is an illustration with error propagation:

function merge(...obs) {
  return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}

const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
const c = Rx.Observable.timer(2200).map(x => { throw 'oops!'; });

merge(a, b, c)
  .subscribe(
    x => { console.log('next', x); },
    x => { console.log('error', x); },
    () => { console.log('complete'); }
  );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

Using the takeUntil outside on the merge is tricky as you would loose the last emitted value.

Interlingua answered 19/12, 2017 at 14:36 Comment(0)
B
1

When an an observable completes, it does not emit a value, but we can concat it with another 'signal' observable that emits a single value. We can then watch for the 'signal' observable's value with the takeWhile operator.

Of course you'd have to ensure that the 'signal' observable's emitted value is not a value that could be emitted by the observables that are being merged - an empty object will suffice if the takeWhile predicate compares by reference.

Here's an example:

const obs1$ = Rx.Observable.interval(1000)
    .map(x => `obs1: ${x}`)
    .take(5);

const obs2$ = Rx.Observable.interval(300)
    .map(x => `obs2: ${x}`)
    .take(9);

const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);

Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
    .takeWhile(x => x !== signalFinishMessage)
    .subscribe(
        x => console.log(x),
        err => console.log('received error:', err),
        () => console.log('complete')
    );
    
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

Errors will also get propagated:

const obs1$ = Rx.Observable.interval(1000)
    .map(x => `obs1: ${x}`)
    .take(5);

const obs2$ = Rx.Observable.interval(300)
    .map(x => `obs2: ${x}`)
    .take(9)
    .concat(Rx.Observable.throw(`the world's about to end`));

const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);

Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
    .takeWhile(x => x !== signalFinishMessage)
    .subscribe(
        x => console.log(x),
        err => console.log('received error:', err),
        () => console.log('complete')
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
Betaine answered 19/12, 2017 at 16:3 Comment(0)
B
1

I ended up rolling my own:

import { Observable } from 'rxjs';

export function whileAll<T>(...observables: Observable<T>[]): Observable<T> {
  return new Observable<T>(function (observer) {
    if (observables.length === 0)
      observer.complete();
    else {
      const next = observer.next.bind(observer);
      const error = observer.error.bind(observer);
      const complete = observer.complete.bind(observer);
      for (let i = 0; i < observables.length; i++)
        observer.add(observables[i].subscribe(next, error, complete));
    }
  });
}
Bentwood answered 8/11, 2019 at 7:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.