Observable.forkJoin and array argument
Asked Answered
M

2

49

In the Observables forkJoin documentation, it says that args can be an array but it doesn't list an example doing so:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

I have tried a function similar to what I listed (below) but came up with an error:

:3000/angular2/src/platform/browser/browser_adapter.js:76 

EXCEPTION: TypeError: Observable_1.Observable.forkJoin is not a function

A sheared version of my function below:

processStuff( inputObject ) {
  let _self = this;

  return new Observable(function(observer) {
    let observableBatch = [];

    inputObject.forEach(function(componentarray, key) {
      observableBatch.push(_self.http.get(key + '.json').map((res: Response) => res.json()));
    });

    Observable.forkJoin(
      observableBatch
    // );
    ).subscribe(() => {
      observer.next();
      observer.complete();
    });

  });
}

The root of my question is related to a loop to end before proceeding as asked here: Angular2 Observable - how to wait for all function calls in a loop to end before proceeding?

But I haven't fully mastered the correct use of forkJoin with an array and the right syntax to do so.

I am very grateful for help you could offer.

NOTE: EXAMPLE OF THIRD FUNCTION THAT RETURNS AN OBSERVABLE

thirdFunction() {
  let _self = this;

  return Observable.create((observer) => {
  // return new Observable(function(observer) {
    ...

    observer.next(responseargs);
    observer.complete();
  });
}

processStuff(inputObject) {
  let _self = this;
  let observableBatch = [];

  inputObject.forEach((componentarray, key) => {
    observableBatch.push(_self.thirdFunction().map((res: Response) => res.json()));
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff(inputObject)
    .subscribe()
}
Medius answered 27/2, 2016 at 22:50 Comment(4)
Observable.forkJoin([http.get(), http.get(), http.get(), etc]).subscribe((res) => ...) See also https://mcmap.net/q/182609/-promise-all-with-rxjsSucculent
Did you forget to import 'rxjs/add/observable/forkJoin' or import 'rxjs/Rx'?Belldas
I just have this import {Observable} from "rxjs/Observable";Medius
See this, same applies for other operators/generators..Belldas
B
93

You need to import operators that are not loaded by default. That's what EXCEPTION Observable.xxxx is not a function usually means. You can either import all operators by adding complete rxjs to your bootstrap, for example:

import 'rxjs/Rx'

or by importing specific operators, in your case:

import 'rxjs/add/observable/forkJoin'

Another observation/suggestion about your code: try to stick with one syntax. You are mixing es5, es6, typescript... and while it is working it will only confuse you in the long run. Also, if you're just starting with Observables, try to avoid new Observable() and use creation operators instead;

processStuff( inputObject ) {
  let observableBatch = [];

  inputObject.forEach(( componentarray, key ) => {
    observableBatch.push( this.http.get( key + '.json').map((res: Response) => res.json()) );
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff( inputObject )
    .subscribe()
}

Finally, refer to the correct documentation - Angular2 uses RxJS v5 and link you provided is for RxJS v4. Docs are still incomplete for v5, but you can find descriptions in many of the source files.

Belldas answered 27/2, 2016 at 23:48 Comment(5)
Hi @Belldas I am having trouble because I am using a third function instead of this.http.get(... Could you please offer an example where that third function return new Observable(function(observer) { I have edited my original post to show an example. Thank you greatly for your help.Medius
figured it out - the thirdFunction needed this: return Observable.create((observer) => { ... observer.next("test"); observer.complete();Medius
I like this solution. What I can't figure out is how to handle errors on some of the request, when most are successful?Sitnik
I last version it should be "return forkJoin(observableBatch)" instead "return Observable.forkJoin(observableBatch)"Maramarabel
@Maramarabel answer is the one compatible with angular 7+ and the import is import {forkJoin} from 'rxjs';Stokehole
C
2

Here is a working demo using Angular 12. The syntax is quite a bit different than the accepted answer (No more Observable.):

https://stackblitz.com/edit/angular-map-to-forkjoin?file=src/app/app.component.ts

NOTE: Open the Developer Tools console to see the output of what I'm doing

First, this takes an array of JSON items (In this case, an array of Users) and converts them to individual HTTP Patch calls using the map method.

  private mapCalls(users: any[]): Observable<any>[] {
    return users.map(user => {
      // Each User will be patched to the endpoint
      // None of these calls will be made until forkJoin is used
      return this.http.patch(
        `https://jsonplaceholder.typicode.com/users/${user.id}`,
        user
      );
    });
  }

That is now an array of Observables. forkJoin is added as a wrapper in order for those calls to be made

return forkJoin(mappedCalls);

The end result can then be subscribed to which will output as a single array with the same count of items.

NOTE: If one call fails in a forkJoin, THEY WILL ALL FAIL.

Cuthbert answered 27/7, 2021 at 17:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.