RxJS: Treat first case specially, but continue streaming the rest
Asked Answered
N

5

6

Say I wanted to separately handle the first event emitted by an Observable, but then continue my subscription.

For example, if I had the following chain:

observable.throttleTime(2000)
      .takeFirst(() => console.log('potato'))
      .switchMap((event:MouseEvent) => {
        const element:Element = event.target as Element;
        return this.innerObservable(('tomato'));
      })
      .subscribe(({result}) => console.log(result))

That fails, but how would I go about making such behaviour work?

Niels answered 26/5, 2017 at 21:5 Comment(1)
It's not clear what you want to do and RxJS does not have a takeFirst operator. If you are using switchMap, its projection function is passed an index parameter. I'd suggest you use that to identify the first emitted value.Forenamed
D
8

Many RxJS 5 operators that take a callback function pass along also the index of the item going through. This means you could write for example the following:

observable.throttleTime(2000)
    .map((value, index) => index === 0 ? 'potato' : value)
    ...
    .subscribe(({result}) => console.log(result))

Indices are passed for example to filter(), switchMap(), takeWhile() operators and more.

Discuss answered 27/5, 2017 at 7:29 Comment(0)
C
2

You can use two subscribers, one for the default case to be applied to all events and the other for the special case of the first event:

import { of } from 'rxjs';
import { first } from 'rxjs/operators';

//emit 4 values
const source = of(1, 2, 3, 4);

/*
Subscriber One - only first value: 1
Subscriber Two - all values: 1
Subscriber Two - all values: 2
Subscriber Two - all values: 3
Subscriber Two - all values: 4
*/
const subscriberOne = source
  .pipe(first())
  .subscribe((val) => console.log(`Subscriber One - only first value: ${val}`));
const subscriberTwo = source.subscribe((val) =>
  console.log(`Subscriber Two - all values: ${val}`)
);

see it live on stackblitz

And to answer your question, where there is a special handler for the first event and then a common handler for the rest of the event, excluding the first one:

import { of, skip } from 'rxjs';
import { first, publish } from 'rxjs/operators';

// emit 4 values
const source = of(1, 2, 3, 4);
// hold until `others` until `connect()` is called, skip `source` first event
const others = publish()(source.pipe(skip(1)));
// marker to make sure first event handler ran before the others
let onceRan = false;

// general case, has to be declared before so `others.subscribe()`
// runs before `others.connect()` is called
const subscriberTwo = others.subscribe((val) =>
  console.log(`Subscriber Two - all values: ${val}, onceRan: ${onceRan}`)
);

// first event handler, `first()` => stops after the first event
const subscriberOne = source.pipe(first()).subscribe((val) => {
  console.log(`Subscriber One - only first value: ${val}, onceRan: ${onceRan}`);
  // toggle the merker
  onceRan = true;
  // release `others`
  others.connect();
});

/*
Output:
Subscriber One - only first value: 1, onceRan: false
Subscriber Two - all values: 2, onceRan: true
Subscriber Two - all values: 3, onceRan: true
Subscriber Two - all values: 4, onceRan: true
*/

see it live on stackblitz

Cantoris answered 11/10, 2022 at 8:41 Comment(0)
M
1

Simple solution to treat the first separately would be

let isFirstCame= false;

 observable
      .subscribe(({result}) =>
         if(!isFirst){
            //here is our first one
            doSpecialWithTheFirst(result);
            isFirstCame=true;
         }else{
           //All the remaining
           doOtherWithRemaining();
         })
Modern answered 14/1, 2018 at 19:45 Comment(1)
I agree, using RxJs doesn't mean it is taboo to use if/elseCantoris
C
0

Use publish along with first and concat operators. Here is an example:

Rx.Observable.interval(1000)
 .publish(shared => {
    return  shared.first().mapTo('FIRST')
       .concat(shared.skip(1));
})
.subscribe(x=>console.log(x))
Caresse answered 26/5, 2017 at 23:5 Comment(0)
A
0

You can create an operator that subscribes to the source observable, processes the first element and then removes itself from the chain. Let's call it onFirst because takeFirst is an existing operator in Java. Rxjs 5 example:

Observable.prototype.onFirst = function(callback) {
  return Observable.create(observer => {
    let source = this.publish()
    let subs = source.subscribe({
      next(v) {
        subs.unsubscribe()
        source.subscribe(observer)
        callback(v)
      },
      error(e) { observer.error(e) },
      complete() { observer.complete() }
    })
    source.connect()
  })
}

// This prints
// first: 1
// 2
// 3
Observable.of(1, 2, 3)
  .onFirst(v => console.log("first: " + v))
  .subscribe(console.log)
Abingdon answered 4/6, 2017 at 15:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.