When using rxjs why doesn't switchMap trigger a complete event?
Asked Answered
R

2

18

Recently, in my Angular app, I've started to use the rxjs switchMap operator in a couple of different scenarios. I soon realised that when using switchMap, when you subscribe to this stream, the completion block does not fire (I don't think error block does either). All the examples I've seen online don't seem to handle a completion block either, and I'm baffled as to what the reason is for this?

I'm obviously missing something in regard to switchMap or how it is used, but I don't know what.

I'd ideally like to call a function with triggers a Http request, and then deal with the error in the error block and then handle post-request stuff in the completion block.

Here's my example of what I'm doing:

export class ResultsComponent {

  ngAfterViewInit() {

    Observable.combineLatest(...filters)
        .debounceTime(500)
        .distinctUntilChanged()
        .switchMap((activeFilters: Array<ActiveFilter>) => {
            const filters = this.mapFilters(activeFilters);
            return this.doSearch(this.term$.getValue(), filters);
        })
        .subscribe((res) => {
           this.onSearchSuccess(res);
        },
        (err) => {
            // THIS NEVER FIRES
            console.error(err);
            this.loading$.next(false);
        ,() => {
            // THIS NEVER FIRES
            this.loading$.next(false);
        });
  }

  private doSearch(input: string, filters: object): Observable<object> {
    return this.searchService.search(input, filters);
  }
}

service

export class SearchService {

  private baseUrl: string = 'http://mydomainhere.com/api';

  constructor(private http: Http) {}

  public search(input: string, filters: object): Observable<object> {
    const params = {
      "keyword": input,
      "filters": filters
    };
    const url = `${this.baseUrl}/search`;
    return this.http.post(url, params)
       .map(res => res.json())
       .catch(this.handleError);
  }
}
Rideout answered 31/10, 2017 at 9:27 Comment(2)
Why would it complete? Do all of the input filters observables complete?Smtih
@Smtih I'm just trying to better understand how switchMap works.Rideout
D
10

For switchMap the completion of the inner observable does not trigger the completion of the stream unless the outer observable has already completed. Here is an example illustrating this:

const first = Rx.Observable.interval(2000).take(2)
	.do(console.log.bind(null, 'first next'),
      console.log.bind(null, 'first error'),
      console.log.bind(null, 'first complete'));
const second = Rx.Observable.interval(200).take(2)
	.do(console.log.bind(null, 'second next'),
      console.log.bind(null, 'second error'),
      console.log.bind(null, 'second complete'));

first.switchMap(() => second)
	.subscribe(console.log.bind(null, 'stream next'),
      console.log.bind(null, 'stream error'),
      console.log.bind(null, 'stream complete'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

An error thrown in the inner observable will call the error block on the outer observable. Here is an example illustrating this:

const source = Rx.Observable.interval(2000).take(4)
	.do(console.log.bind(null, 'source next'),
      console.log.bind(null, 'source error'),
      console.log.bind(null, 'source complete'));
const error = Rx.Observable.create((o) => {
  o.error();
}).do(console.log.bind(null, 'error next'),
      console.log.bind(null, 'error error'),
      console.log.bind(null, 'error complete'));

source.switchMap(() => error)
	.subscribe(console.log.bind(null, 'stream next'),
      console.log.bind(null, 'stream error'),
      console.log.bind(null, 'stream complete'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

So you can put a catch on the outer observable and get the error if you like.

If you want to observe the completion of the inner observable then you will have to observe it inside of the switchMap.

As far as why you don't see much about using the completion block online, I can't speak for everyone but personally I don't find myself needing it much in my application. I just care about the data that comes out of next.

Demisemiquaver answered 31/10, 2017 at 12:0 Comment(1)
great. that gives a good indication of the inner-workings of switchMap - thanks.Rideout
R
12

Completing the Outer Observable with the Inner

There are a number of ways to cause the outer observable to complete with the inner. (The next section explains why you might not want to do this, followed by an example of detecting inner observable completion when the outer observable doesn't complete.)

If you know your inner observable will only emit one value before completing, like with an API call, you can just pipe first onto your outer observable.

const { of , pipe } = rxjs;
const { switchMap, first } = rxjs.operators;

const stream = of(1, 2, 3).pipe(
    switchMap(() => of(4)),
    first()
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>

But if the inner observable emits multiple values a simple change would be to use endWith and takeWhile to tell the outer observable when to complete. This assumes we know the inner observable will never emit null.

const { of , pipe } = rxjs;
const { switchMap, endWith, takeWhile } = rxjs.operators;

const stream = of(1, 2, 3).pipe(
    switchMap(() => of(4, 5, 6).pipe(
      endWith(null)
    )),
    takeWhile((x) => x != null)
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>

A general solution is to have a Subject emit when the inner observable completes, and have the outer observable complete when the Subject emits, watching for it with takeUntil.

const { of , pipe, Subject } = rxjs;
const { switchMap, tap, takeUntil } = rxjs.operators;

const innerComplete = new Subject();

const stream = of(1, 2, 3).pipe(
    switchMap(() => of(4, 5, 6).pipe(
      tap({
        complete: () => innerComplete.next()
      })
    )),
    takeUntil(innerComplete)
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>

Why Doesn't It Complete?

When I first started working with RxJS I was mainly converting existing API calls to be handled with observables. In practice this meant that an outer observable would complete when the inner observable completed. But it is important to note the outer observable was not caused to complete because the inner observable did. It completed because it would only emit one value. If it was an observable that could emit multiple values, like from mouse click events, it would not complete with the inner observable.

This is a good thing. It allows you to have an outer observable that maps its emissions through an inner observable, without it completing the first time the inner observable does. For example, lets say you wanted to trigger an animation on each mouse click and the animation is controlled by a timer. The mouse clicks would be emitted by the outer observable. And the inner observable would run a timer for a few seconds to control the animation. After the animation completes, you'd still like mouse click events to be captured so the animation can start up again.

The following snippet will log a series of numbers to the console (our makeshift animation) on each click. And since we're using switchMap the previous "animation" will stop if you click in the middle of it (The concatMap piece just adds a delay between each emission). You can see this visually in the marble diagram for switchMap at https://rxmarbles.com/#switchMap

const { of , pipe, fromEvent, Subject } = rxjs;
const { switchMap, concatMap, delay } = rxjs.operators;

const innerComplete = new Subject();

const stream = fromEvent(document, 'click').pipe(
    switchMap(() => of(1, 2, 3).pipe(
      concatMap(x => of(x).pipe(delay(500)))
    ))
  )
  .subscribe({
    next: (x) => console.log(x),
    complete: () => console.log('outer complete')
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>

<p>Click here and watch the console.</p>

Acting on Inner Observable Completion

Given that it makes sense that the outer observable doesn't need to complete when the inner observable does you may want a way to do something when the inner observable completes without having to complete the outer observable. tap will let you do that when you pass an Observer as an argument.

const { of , pipe } = rxjs;
const { switchMap, tap } = rxjs.operators;

const stream = of (1, 2, 3).pipe(
    switchMap(() => of (4, 5, 6).pipe(tap({
      complete: () => console.log("Inner observable completed")
    }))))
    .subscribe({
      next: (x) => console.log(x),
      complete: () => console.log('Outer observable completed')
    });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.7/rxjs.umd.js"></script>
Rigorism answered 2/6, 2021 at 9:46 Comment(0)
D
10

For switchMap the completion of the inner observable does not trigger the completion of the stream unless the outer observable has already completed. Here is an example illustrating this:

const first = Rx.Observable.interval(2000).take(2)
	.do(console.log.bind(null, 'first next'),
      console.log.bind(null, 'first error'),
      console.log.bind(null, 'first complete'));
const second = Rx.Observable.interval(200).take(2)
	.do(console.log.bind(null, 'second next'),
      console.log.bind(null, 'second error'),
      console.log.bind(null, 'second complete'));

first.switchMap(() => second)
	.subscribe(console.log.bind(null, 'stream next'),
      console.log.bind(null, 'stream error'),
      console.log.bind(null, 'stream complete'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

An error thrown in the inner observable will call the error block on the outer observable. Here is an example illustrating this:

const source = Rx.Observable.interval(2000).take(4)
	.do(console.log.bind(null, 'source next'),
      console.log.bind(null, 'source error'),
      console.log.bind(null, 'source complete'));
const error = Rx.Observable.create((o) => {
  o.error();
}).do(console.log.bind(null, 'error next'),
      console.log.bind(null, 'error error'),
      console.log.bind(null, 'error complete'));

source.switchMap(() => error)
	.subscribe(console.log.bind(null, 'stream next'),
      console.log.bind(null, 'stream error'),
      console.log.bind(null, 'stream complete'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

So you can put a catch on the outer observable and get the error if you like.

If you want to observe the completion of the inner observable then you will have to observe it inside of the switchMap.

As far as why you don't see much about using the completion block online, I can't speak for everyone but personally I don't find myself needing it much in my application. I just care about the data that comes out of next.

Demisemiquaver answered 31/10, 2017 at 12:0 Comment(1)
great. that gives a good indication of the inner-workings of switchMap - thanks.Rideout

© 2022 - 2024 — McMap. All rights reserved.