Completing the Outer Observable with the Inner
There are a number of ways to cause the outer observable to complete with the inner. (The next section explains why you might not want to do this, followed by an example of detecting inner observable completion when the outer observable doesn't complete.)
If you know your inner observable will only emit one value before completing, like with an API call, you can just pipe first
onto your outer observable.
const { of , pipe } = rxjs;
const { switchMap, first } = rxjs.operators;
const stream = of(1, 2, 3).pipe(
switchMap(() => of(4)),
first()
)
.subscribe({
next: (x) => console.log(x),
complete: () => console.log('outer complete')
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
But if the inner observable emits multiple values a simple change would be to use endWith
and takeWhile
to tell the outer observable when to complete. This assumes we know the inner observable will never emit null
.
const { of , pipe } = rxjs;
const { switchMap, endWith, takeWhile } = rxjs.operators;
const stream = of(1, 2, 3).pipe(
switchMap(() => of(4, 5, 6).pipe(
endWith(null)
)),
takeWhile((x) => x != null)
)
.subscribe({
next: (x) => console.log(x),
complete: () => console.log('outer complete')
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
A general solution is to have a Subject
emit when the inner observable completes, and have the outer observable complete when the Subject emits, watching for it with takeUntil
.
const { of , pipe, Subject } = rxjs;
const { switchMap, tap, takeUntil } = rxjs.operators;
const innerComplete = new Subject();
const stream = of(1, 2, 3).pipe(
switchMap(() => of(4, 5, 6).pipe(
tap({
complete: () => innerComplete.next()
})
)),
takeUntil(innerComplete)
)
.subscribe({
next: (x) => console.log(x),
complete: () => console.log('outer complete')
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
Why Doesn't It Complete?
When I first started working with RxJS I was mainly converting existing API calls to be handled with observables. In practice this meant that an outer observable would complete when the inner observable completed. But it is important to note the outer observable was not caused to complete because the inner observable did. It completed because it would only emit one value. If it was an observable that could emit multiple values, like from mouse click events, it would not complete with the inner observable.
This is a good thing. It allows you to have an outer observable that maps its emissions through an inner observable, without it completing the first time the inner observable does. For example, lets say you wanted to trigger an animation on each mouse click and the animation is controlled by a timer. The mouse clicks would be emitted by the outer observable. And the inner observable would run a timer for a few seconds to control the animation. After the animation completes, you'd still like mouse click events to be captured so the animation can start up again.
The following snippet will log a series of numbers to the console (our makeshift animation) on each click. And since we're using switchMap
the previous "animation" will stop if you click in the middle of it (The concatMap piece just adds a delay between each emission). You can see this visually in the marble diagram for switchMap
at https://rxmarbles.com/#switchMap
const { of , pipe, fromEvent, Subject } = rxjs;
const { switchMap, concatMap, delay } = rxjs.operators;
const innerComplete = new Subject();
const stream = fromEvent(document, 'click').pipe(
switchMap(() => of(1, 2, 3).pipe(
concatMap(x => of(x).pipe(delay(500)))
))
)
.subscribe({
next: (x) => console.log(x),
complete: () => console.log('outer complete')
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
<p>Click here and watch the console.</p>
Acting on Inner Observable Completion
Given that it makes sense that the outer observable doesn't need to complete when the inner observable does you may want a way to do something when the inner observable completes without having to complete the outer observable. tap
will let you do that when you pass an Observer as an argument.
const { of , pipe } = rxjs;
const { switchMap, tap } = rxjs.operators;
const stream = of (1, 2, 3).pipe(
switchMap(() => of (4, 5, 6).pipe(tap({
complete: () => console.log("Inner observable completed")
}))))
.subscribe({
next: (x) => console.log(x),
complete: () => console.log('Outer observable completed')
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>