RxJS 6 - Cancel / End a Pipe
Asked Answered
S

6

34

Working with the new version of RxJS 6 and the pipe operator in particular. Currently using the pipe to take the results of an API call and pass them to a series of additional tasks.

All works great, but can't seem to find a way to cancel or end a pipe should I encounter an issue. For example, I'm using the tap operator to check if the value is null. I then throw an error, but the pipe still appears to move to the next task, in this case concatmap.

Therefore, how do you end or cancel a pipe prematurely? Thanks in advance.

getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
   tap(evt => {
    if (evt == null) {
      return throwError(new Error("No data found..."));
    }
  }),
concatMap(
  evt =>
     <Observable<any[]>>(
        this.http.get<any[]>(
    `${this.baseUrl}/path/relatedby/${evt.child_id}`
      ).map(res =>( {"response1":evt, "response2":res}) )
 )
),
retry(3),
catchError(this.handleError("getData", []))
);}
Simulator answered 23/7, 2018 at 21:35 Comment(0)
S
20

You can also cancel/end a pipe by using a signal Subject and the rxjs operator: takeUntil

Example

httpGetSafe(path: string): Observable<Data> {
  const stopSignal$ = new Subject();

  return this.http.get<Data>(path).pipe(
    map(data => {
      const isBad = data === null;
      if (isBad) {
        stopSignal$.next();
      }
      return data;
    }),
    takeUntil(stopSignal$)
  );
}

Sometimes it's a bit simpler and more flexible than throwing errors...

Spallation answered 20/2, 2020 at 1:42 Comment(0)
T
12

I tried the basic concept from what you have with this stackblitz and it worked. It cancelled the remaining operations. See the link below.

https://stackblitz.com/edit/angular-4ctwsd?file=src%2Fapp%2Fapp.component.ts

Differences I see between your code and mine is that I used throw and not throwError (is that something you wrote?) and I'm just throwing the error ... not returning a thrown error.

Here is the code for reference:

import { Component } from '@angular/core';
import { of, from } from 'rxjs';
import { map, catchError, tap, retry} from 'rxjs/operators';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent {
  name = 'Angular 6';

  constructor() {
    of('a', 'b', 'c', 'd')
      .pipe(
       map(x => {
        if (x === 'c') {
          throw 'An error has occurred';
        }
        return x;
       }),
       tap(x => console.log('In tap: ', x)),
       retry(3),
       catchError(() => of('caught error!'))
      )
      .subscribe(x => console.log(x));
  }
}
Transportation answered 23/7, 2018 at 22:30 Comment(0)
S
9

RXJS can throw and error to stop execution. The operator you applied in the pipe was just the wrong choice.

The 'tap' operator that you are using is only meant to be used to apply side-effect, i.e. make changes to the DOM, console.log, or change the values for some variables that you've got on your component (i.e. this.counter = someValue). The tap operator was not meant to to change the RXJS 'stream' - it will just return the same observable as it received. https://rxjs-dev.firebaseapp.com/api/operators/tap

On the other hand, operators that do work on the stream, like 'map', can throw errors. See this stackblitz

In summary, the code is

getData(): Observable<any[]> {
    return this.httpGet('basePath').pipe(
      map(evt => {
        if (evt === null) {
          return throwError(new Error("No data found..."));
        } else {
          return evt;
        }
      }),
      concatMap(evt => {
        return this.httpGet(`childPath/${evt.serverResult}`); 
      }),
      map(res => {
        return {
          "response2": res.serverResult
        };
      }),
      retry(3),
      catchError(error => {
        console.log('error handled!');
        return of(null);
      })
    )
  }

  private httpGet(someString: string): Observable<{ serverResult: number }> {
    return timer(1000).pipe( // fake a server call that takes 1 second
      map(_ => { // fake a server result
      // Comment out the valid return and uncomment the null return to see it in action
        //return null;
        return { 
          serverResult: 1 
        }
      }) 
    );
  }

If you don't want to transform the error into a valid value in the stream, handle it in the error callback on the subscribe function.

Spermaceti answered 24/8, 2019 at 21:20 Comment(0)
L
8

If you just want to end the execution of the pipe without throwing an error you can return EMPTY.

myObservable.pipe(
  switchMap(c => c === null ? EMPTY: of(c)),
  tap(() => console.log('If c is null this is not executed anymore'))
);

Also the takeWhile operator is an option.

myObservable.pipe(
  takeWhile(c => c !== null),
  tap(() => console.log('If c is null this is not executed anymore'))
);

Both methods straight go to complete without go down the pipe!

Lax answered 25/8, 2021 at 8:10 Comment(1)
The switchMap suggestion in part 1 worked great. Thank you.The takeWhile cancelled the entire pipe forever.Washwoman
E
4

I'd like to suggest a different approach. This is in case you don't want to throw an error, just handle the case. Create two observables and return one of them in a switchMap based on an if check.

getData(id: String): Observable<any> {
 return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
   switchMap(evt => {
    if (evt == null) {
      // The chain will end here
      // subscription succeeds
      return of(null);
    }
    // This is an alternative chain 
    // You can construct further logic with rxjs operators
    return <Observable<any[]>>(
        this.http.get<any[]>(
    `${this.baseUrl}/path/relatedby/${evt.child_id}`
      ).map(res =>( {"response1":evt, "response2":res}) );
  }),

);}

Pardon any typos, this is just a quick example to demonstrate the approach.

Equities answered 27/5, 2021 at 11:26 Comment(0)
P
2

Observable functions are always wrapped internally in a try/catch block. Any error thrown in the stream will end the stream and call any error callbacks to subscribers or operators.

The problem here is with throwError(). I don't know what that function is and I don't recognize it as an Observable operator, but it looks like it's being used as one (and never subscribed to).

tap is usually only used for side effects only, as it is completely unable to affect the values in the stream. However, as I mentioned with the try/catch blocks before, you should just be able to throw a new Error and the stream will take care of the rest.

getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
   tap(evt => {
    if (evt == null) {
      throw new Error("No data found...");
    }
  }),
concatMap(
  evt =>
     <Observable<any[]>>(
        this.http.get<any[]>(
    `${this.baseUrl}/path/relatedby/${evt.child_id}`
      ).map(res =>( {"response1":evt, "response2":res}) )
 )
),
retry(3),
catchError(this.handleError("getData", []))
);}
Parvati answered 24/7, 2018 at 2:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.