I want to execute tap() only when i get the first emitted value
Something like:
Observable
.pipe(
tap(() => { /* execute only when I get the first emitted value */ })
)
.subscribe(() => {
// .....
})
I want to execute tap() only when i get the first emitted value
Something like:
Observable
.pipe(
tap(() => { /* execute only when I get the first emitted value */ })
)
.subscribe(() => {
// .....
})
You can use the index in map operators like concatMap
. Unlike other approaches this is totally flexible about the chosen index. Let's say if you want tap on 2nd emission index === 1
or any predicate like index % 2 === 0
// these are because of using rxjs from CDN in code snippet, ignore them
const {of, interval} = rxjs;
const {take, tap, concatMap} = rxjs.operators;
// main code
const stream = interval(250).pipe(take(4))
stream.pipe(
concatMap((value, index) => index === 0
? of(value).pipe(
tap(() => console.log('tap'))
)
: of(value)
)
)
.subscribe(x => console.log(x));
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/rxjs.umd.js"></script>
If I correctly get your idea you want to execute tap()
only at the beginning of the stream subscription and not other times. This is my custom operator for that:
import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';
export function startWithTap<T>(callback: () => void) {
return (source: Observable<T>) =>
of({}).pipe(tap(callback), switchMap((o) => source));
}
As for example for usage of this operator would be:
this.api.getData().pipe(
startWithTap(() => this.loading.start()),
)
This is my actual code example where loading gets started when someone subscribes to the Observable created by api service (by means of httpClient).
UPDATE
Use this instead of the above implementation, because this one only uses defer
, instead of using of
, tap
and switchMap
:
export function startWithTap<T>(callback: () => void) {
return (source: Observable<T>) =>
defer(() => {
callback();
return source;
});
}
I like the approach of jal's answer and would suggest wrapping it in its own operator:
export function tapOnce<T>(tapFn: (t: T) => void, tapIndex = 0): OperatorFunction<T, T> {
return source$ => source$.pipe(concatMap((value, index) => {
if (index === tapIndex) {
tapFn(value);
}
return of(value);
}));
}
Usage would look like this:
stream.pipe(tapOnce(() => console.log('tapping once'), 1));
This could even be abstracted further to an operator that takes a function to determine whether it should tap according to the given value/index:
export function tapWhen<T>(tapFn: (t: T) => void, evaluateFn: (index: number, t: T) => boolean): OperatorFunction<T, T> {
return source$ => source$.pipe(concatMap((value, index) => {
if (evaluateFn(index, value)) {
tapFn(value);
}
return of(value);
}));
}
If someone interested, here is an extra simple implementation of tapN. So it executes the specified callback function for each emission until the number of emissions is equal to nEmissions. In order to execute the tap() function only for the first element you would do tapN(1), but you could also execute tap for the 3 firsts emission with tapN(3) for example.
/* Executes the specified callback function for each emission until the number of emissions is equal to nEmissions*/
export const tapN = <T>(nEmissions, callback: (T) => void) => (source$: Observable<T>): Observable<T> =>
defer(() => {
let counter = 0;
return source$.pipe(tap((item) => {
if (counter < nEmissions) {
callback(item);
counter++;
}
}));
});
In your code:
Observable
.pipe(
tapN(1, () => { /* this code would be only executed on the first emitted value */ })
)
.subscribe(() => {
// .....
})
Apart from the already mentioned options you also use multicast
.
multicast(new Subject(), s => concat(
s.pipe(
take(1),
tap(v => console.log('tap', v)),
),
s
)
Live demo: https://stackblitz.com/edit/rxjs-shvuxm
() => new Subject()
for this to work properly. Providing just one Subject will mess up the values the observable emits if you subscribe multiple times, see: stackblitz.com/edit/rxjs-pul1gw?file=index.ts. –
Rockyrococo You can share() your main Observable like below:
import { timer, of, BehaviorSubject, interval } from 'rxjs';
import { tap, mapTo, share, shareReplay, } from 'rxjs/operators';
const source$ = timer(1000)
.pipe(
tap((v) => console.log('SIDE EFFECT')),
mapTo('RESULT')
)
const sharedSource$ = source$.pipe(share());
// or shareReplay(1) if you want to ensure every subscriber get the last value event if they will subscribe later;
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
https://stackblitz.com/edit/typescript-qpnbkm?embed=1&file=index.ts
This is an example like in learn-rxjs
This will execute only on the first emit:
skipWhile((x, i) => {
if (i === 0) /* Do something */;
return false;
}),
If you want the callback to execute every time, you can use this:
filter((x, i) => {
if (i === 0) /* Do something */;
return true;
}),
(Updating my earlier incorrect answer)
Based on cartant's comment and link provided, he has already done the work to create an operator that does this, it is in the 'rxjs-etc' package. A solution based on his operator is to install 'rxjs-etc', then:
import { initial } from 'rxjs-etc/operators';
observable$.pipe(
initial(src$ => src$.pipe(tap(() => {/* execute only on first value */})))
).
subscribe(() => {
// .....
})
Working StackBlitz example.
© 2022 - 2025 — McMap. All rights reserved.
stream$.pipe(first() /* sameas take(1)*/, tap(...)).subscribe(() => {})
– Kinsleyfirst
andtake
will auto-subscribe the subscription after one emit – Jigging