How to 'wait' for two observables in RxJS
Asked Answered
M

8

128

In my app i have something like:

this._personService.getName(id)
      .concat(this._documentService.getDocument())
      .subscribe((response) => {
                  console.log(response)
                  this.showForm()
       });

 //Output: 
 // [getnameResult]
 // [getDocumentResult]

 // I want:
 // [getnameResult][getDocumentResult]

Then i get two separated results, first of the _personService and then the _documentService. How can I wait for both results before call this.showForm() to finish an then manipulate the results of each one.

Mojgan answered 16/5, 2017 at 14:22 Comment(7)
forkJoin github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/…Aw
for what I understand, you already are, by virtue of concatAlexander
@Alexander In this case, the values are being emitting separately. One after other.Mojgan
in forkJoin' subscribe will get one result - tuple with first and second responses - this is what exactly that you asked?Aw
you can look at forkJoin in my blog post - medium.com/@juliapassynkova/q-map-to-rxjs-981936a2b22dAw
@ggui They are right, use forkJoin.Henhouse
forkjoin does not always work as it requires both observables to "complete". at times you want to both have fired "next" but not necessarily "completed"Crawfish
H
180

Last Update: Mar, 2022.

RxJS v7: combineLatestWith

From reactiveX documentation:

Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.

// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
name$.pipe(
        combineLatestWith($document)
      )
      .subscribe(([name, document]) => {
           this.name = name;
           this.document = pair.document;
           this.showForm();
       })

(Deprecated) RxJS v6 combineLatest()

From reactiveX documentation:

Whenever any input Observable emits a value, it computes a formula using the latest values from all the inputs, then emits the output of that formula.

(Update: Feb, 2021):

// Deprecated (RxJS v6)
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
name$.combineLatest(document$, (name, document) => {name, document})
    .subscribe(pair => {
           this.name = pair.name;
           this.document = pair.document;
           this.showForm();
       })

(alternate syntax): combineLatest(observables)

// Deprecated (RxJS v6)
// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
combineLatest(name$, document$, (name, document) => ({name, document}))
    .subscribe(pair => {
           this.name = pair.name;
           this.document = pair.document;
           this.showForm();
       })

zip vs combineLatest

(Update: Oct, 2018) I previously suggested the use of zip method. However, for some use cases, combineLatest has a few advantages over zip. So it is important to understand the differences.

CombineLatest emits the latest emitted values from observables. While zip method emits the emitted items in sequence order.

For example if observable #1 emits its 3rd item and observable #2 has emitted its 5th item. The result using zip method will be the 3rd emitted values of both observables.

In this situation the result using combineLatest will be the 5th and 3rd. which feels more natural.


Observable.zip(observables)

(Original answer: Jul, 2017) Observable.zip method is explained in reactiveX documentation:

Combines multiple Observables to create an Observable whose values are calculated from the values, in order, of each of its input Observables.

// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
Observable
    .zip(name$, document$, (name: string, document: string) => ({name, document}))
    .subscribe(pair => {
           this.name = pair.name;
           this.document = pair.document;
           this.showForm();
       })

a side note (applies for both methods)

The last parameter, where we have provided a function, (name: string, document: string) => ({name, document}) is optional. You can skip it, or do more complex operations:

If the latest parameter is a function, this function is used to compute the created value from the input values. Otherwise, an array of the input values is returned.

So if you skip the last part, you get an array:

// Observables to combine
const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();
    
Observable
    .zip(name$, document$)
    .subscribe(pair => {
           this.name = pair['0'];
           this.document = pair['1'];
           this.showForm();
       })
Hydroxide answered 5/7, 2017 at 21:53 Comment(5)
Is it possible to wait for completion of one observable with this ?, my observable have another inner observable, which in turn rely on an http.get ?Sternwheeler
In your Jan 2020 update, why map the array to object? Seems like an unnecessary step when you could destruct the array within the subscribe method. That would be my only comment, the rest looks good.Sherwood
OOOOHHHH, I was looking for combineLatest operator for an hour... many thanksBriney
How do you unsubscribe from the combined observables ? Just unsubscribe from the observable that is returned from combinedLatest().subscribe() ?Civic
combineLatest has been deprecated in favour of combineLatestWith, see here: rxjs.dev/api/operators/combineLatestParkman
V
69

Use forkJoin() method of observables. Check this link for reference

From the RXJS docs

This operator is best used when you have a group of observables and only care about the final emitted value of each. One common use case for this is if you wish to issue multiple requests on page load (or some other event) and only want to take action when a response has been receieved for all. In this way it is similar to how you might use Promise.all

forkJoin([character, characterHomeworld]).subscribe(results => {
  // results[0] is our character
  // results[1] is our character homeworld
  results[0].homeworld = results[1];
  this.loadedCharacter = results[0];
});

Code taken from: https://coryrylan.com/blog/angular-multiple-http-requests-with-rxjs

Vernita answered 14/7, 2017 at 2:17 Comment(3)
Is it possible to wait for completion of one observable with this ?, my observable have another inner observable, which in turn rely on an http.get ?Sternwheeler
@Sternwheeler If you wanna do sth. after completion of one observable, can you use nesting subscription?Featherston
@YuweiHE very bad advice, for that there is switchmap/flatmap operators. avoid nested subscriptionAglaia
C
33

The RxJS Operators for Dummies: forkJoin, zip, combineLatest, withLatestFrom helped me a lot. As the name states it describes the following combination operators:

Any of them could be the thing you are looking for, depends on the case. Check the article for more info.

Cobweb answered 22/2, 2019 at 16:29 Comment(1)
Thanks the first link you have given i.e "RxJs Operators ..." is a must read and the best and easiest explanation you can ever getAdamantine
A
10

Improvement of Hamid Asghari answer which use direct arguments decomposition and automatically add types (when you use typescript)

const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();

combineLatest([name$, document$]).subscribe(([name, document]) => {
    this.name = name;
    this.document = document;
    this.showForm();
});

BONUS: You can also handle errors using above approach as follows

import { combineLatest, of } from 'rxjs';
//...

const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();

combineLatest([
  name$.pipe(     catchError( () => of(null as string  ) ) ), 
  document$.pipe( catchError( () => of(null as Document) ) ), // 'Document' is arbitrary type
]).subscribe(([name, document]) => {
    this.name = name;          // or null if error
    this.document = document;  // or null if error
    this.showForm();
});
Arcograph answered 6/4, 2020 at 18:35 Comment(0)
L
4

June 2021

With rxjs 6.6.7

Use combineLatest like this otherwise is deprecated

combineLatest([a$ , b$]).pipe(
      map(([a, b]) => ({a, b})) //change to [a , b] if you want an array
    )

Also see @nyxz post

zip - the love birds, always work as a team, triggers only when all observables return new values

combineLatest - the go dutch, start trigger once all observables return new values, then wait for no man, trigger every time when either observable return new value.

withLatestFrom - the master slave, master first waits for slave, after that, action get triggered every time only when master return new value.

forkJoin - the final destination, trigger once when all observables have completed.

From : https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom/amp

Limitation answered 1/6, 2021 at 16:28 Comment(0)
M
2

Have a look at the 'combineLatest' method, it might be appropriate here. http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-combineLatest

const { Observable } = Rx

const name$ = this._personService.getName(id);
const document$ = this._documentService.getDocument();

Observable
    .combineLatest(name$, document$, (name, document) => ({ name, document }))
    .first() // or not, implementation detail
    .subscribe(({ name, document }) => {
        // here we have both name and document
        this.showForm()
    })
Monkfish answered 16/5, 2017 at 16:21 Comment(0)
C
2

For me this sample was best solution.

const source = Observable.interval(500);
const example = source.sample(Observable.interval(2000));
const subscribe = example.subscribe(val => console.log('sample', val));

So.. only when second (example) emit - you will see last emited value of first (source).

In my task, I wait form validation and other DOM event.

Craniotomy answered 10/12, 2017 at 23:2 Comment(0)
R
0

You can use 'zip' or 'buffer' like the following.

function getName() {
    return Observable.of('some name').delay(100);
}

function getDocument() {
    return Observable.of('some document').delay(200);
}

// CASE1 : concurrent requests
Observable.zip(getName(), getDocument(), (name, document) => {
    return `${name}-${document}`;
})
    .subscribe(value => console.log(`concurrent: ${value}`));

// CASE2 : sequential requests
getName().concat(getDocument())
    .bufferCount(2)
    .map(values => `${values[0]}-${values[1]}`)
    .subscribe(value => console.log(`sequential: ${value}`));
Rockbound answered 16/5, 2017 at 14:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.