rxjs execute tap only at the first time
Asked Answered
C

8

28

I want to execute tap() only when i get the first emitted value

Something like:

Observable
  .pipe(
     tap(() => { /* execute only when I get the first emitted value */ })
  )
  .subscribe(() => {
     // .....
  })
Cyclotron answered 8/1, 2019 at 20:27 Comment(4)
there is .first() operator among others, you can use it inside .pipe()Herwick
See github.com/cartant/rxjs-etc/blob/master/source/operators/… and its dual github.com/cartant/rxjs-etc/blob/master/source/operators/…Satin
stream$.pipe(first() /* sameas take(1)*/, tap(...)).subscribe(() => {})Kinsley
first and take will auto-subscribe the subscription after one emitJigging
H
34

You can use the index in map operators like concatMap. Unlike other approaches this is totally flexible about the chosen index. Let's say if you want tap on 2nd emission index === 1 or any predicate like index % 2 === 0

// these are because of using rxjs from CDN in code snippet, ignore them
const {of, interval} = rxjs;
const {take, tap, concatMap} = rxjs.operators;


// main code
const stream = interval(250).pipe(take(4))

stream.pipe(
  concatMap((value, index) => index === 0
    ? of(value).pipe(
        tap(() => console.log('tap'))
      )
    : of(value)
  )
)
.subscribe(x => console.log(x));
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/rxjs.umd.js"></script>
Hardtop answered 8/1, 2019 at 20:55 Comment(0)
Z
18

If I correctly get your idea you want to execute tap() only at the beginning of the stream subscription and not other times. This is my custom operator for that:

import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';

export function startWithTap<T>(callback: () => void) {
  return (source: Observable<T>) =>
    of({}).pipe(tap(callback), switchMap((o) => source));
}

As for example for usage of this operator would be:

this.api.getData().pipe(
  startWithTap(() => this.loading.start()),
)

This is my actual code example where loading gets started when someone subscribes to the Observable created by api service (by means of httpClient).


UPDATE

Use this instead of the above implementation, because this one only uses defer, instead of using of, tap and switchMap:

export function startWithTap<T>(callback: () => void) {
  return (source: Observable<T>) =>
    defer(() => {
      callback();
      return source;
    });
}
Zobias answered 9/1, 2019 at 9:19 Comment(5)
Why the updated solution is better?Undesirable
@Undesirable updated solution works the same way with only using 1 rxjs operator instead of 3, hence less complicationZobias
This solution is actually behaving differently than the OP asked. It is calling the callback when there is a subscription. It does not call it on the first emission.Lectra
that's actually true, but some people want this behavior and actually end up on this SO question.Zobias
Adding a gotcha here: Neither of these solutions allow the original observable to be delayed. However, jBuchholz's answer does, so I'd recommend going with that if delaying is something you need.Shattuck
O
9

I like the approach of jal's answer and would suggest wrapping it in its own operator:

export function tapOnce<T>(tapFn: (t: T) => void, tapIndex = 0): OperatorFunction<T, T> {
  return source$ => source$.pipe(concatMap((value, index) => {
    if (index === tapIndex) {
      tapFn(value);
    }
    return of(value);
  }));
}

Usage would look like this:

stream.pipe(tapOnce(() => console.log('tapping once'), 1));

This could even be abstracted further to an operator that takes a function to determine whether it should tap according to the given value/index:

export function tapWhen<T>(tapFn: (t: T) => void, evaluateFn: (index: number, t: T) => boolean): OperatorFunction<T, T> {
  return source$ => source$.pipe(concatMap((value, index) => {
    if (evaluateFn(index, value)) {
      tapFn(value);
    }
    return of(value);
  }));
}
Olivia answered 7/1, 2021 at 10:3 Comment(0)
H
5

If someone interested, here is an extra simple implementation of tapN. So it executes the specified callback function for each emission until the number of emissions is equal to nEmissions. In order to execute the tap() function only for the first element you would do tapN(1), but you could also execute tap for the 3 firsts emission with tapN(3) for example.

/* Executes the specified callback function for each emission until the number of emissions is equal to nEmissions*/
export const tapN = <T>(nEmissions, callback: (T) => void) => (source$: Observable<T>): Observable<T> =>
    defer(() => {
        let counter = 0;
        return source$.pipe(tap((item) => {
            if (counter < nEmissions) {
                callback(item);
                counter++;
            }
        }));
    });

In your code:

Observable
  .pipe(
     tapN(1, () => { /* this code would be only executed on the first emitted value */ })
  )
  .subscribe(() => {
     // .....
  })
Heliograph answered 23/1, 2020 at 14:31 Comment(0)
B
3

Apart from the already mentioned options you also use multicast.

multicast(new Subject(), s => concat(
  s.pipe(
    take(1),
    tap(v => console.log('tap', v)),
  ),
  s
)

Live demo: https://stackblitz.com/edit/rxjs-shvuxm

Bowlds answered 9/1, 2019 at 10:45 Comment(1)
You have to provide a subject factory function () => new Subject() for this to work properly. Providing just one Subject will mess up the values the observable emits if you subscribe multiple times, see: stackblitz.com/edit/rxjs-pul1gw?file=index.ts.Rockyrococo
L
0

You can share() your main Observable like below:

import { timer, of, BehaviorSubject, interval } from 'rxjs';
import { tap, mapTo, share, shareReplay, } from 'rxjs/operators';

const source$ = timer(1000)
.pipe(
  tap((v) => console.log('SIDE EFFECT')),
  mapTo('RESULT')
)
const sharedSource$ = source$.pipe(share());
// or shareReplay(1) if you want to ensure every subscriber get the last value event if they will subscribe later;

sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);

https://stackblitz.com/edit/typescript-qpnbkm?embed=1&file=index.ts

This is an example like in learn-rxjs

Lemonade answered 15/4, 2020 at 20:45 Comment(0)
S
0

This will execute only on the first emit:

skipWhile((x, i) => {
    if (i === 0) /* Do something */;
    return false;
}),

If you want the callback to execute every time, you can use this:

filter((x, i) => {
    if (i === 0) /* Do something */;
    return true;
}),
Sisak answered 4/5, 2024 at 11:2 Comment(0)
H
-1

(Updating my earlier incorrect answer)

Based on cartant's comment and link provided, he has already done the work to create an operator that does this, it is in the 'rxjs-etc' package. A solution based on his operator is to install 'rxjs-etc', then:

import { initial } from 'rxjs-etc/operators';

observable$.pipe(
    initial(src$ => src$.pipe(tap(() => {/* execute only on first value */})))
).
subscribe(() => {
    // ..... 
})

Working StackBlitz example.

Helicopter answered 8/1, 2019 at 20:34 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.