Work around for more than 6 forkJoin parameters?
Asked Answered
B

5

10

I have a need/desire to use more than six parameters in a forkJoin. Currently, based on the answer to another related question it doesn't appear possible to send more than 6 parameters to forkJoin.

However, based on the offical documentation, it says "forkJoin is an operator that takes any number of Observables which can be passed either as an array or directly as arguments."

forkJoin - Official Docs

Well, I'm doing that and I get an error TS2322:Type 'foo' is not assignable to type 'bar[]'.

In my research I've also found that it's best not to send the arguments as an array if you have promises that return different types, as that will type cast them to all the same type. - Source

Here's my code. I'm using the latest version of Typescript and Angular 4.

ngOnInit() {
    this.spinner.show();
    Observable.forkJoin(
        this.loadParams(),              // Returns an Observable<Object>
        this.service.getPromiseType1(), // The rest return Observable<Array>
        this.service.getPromiseType2(),
        this.service.getPromiseType3(),
        this.service.getPromiseType4(),
        this.service.getPromiseType5(),
        this.service.getPromiseType6(),
    ).finally(() => this.spinner.hide())
        .subscribe(
            ([param, promise1, promise2, promise3, promise4, promise5, promise6]) => {
                this.job = job;
                this.value1 = promise1;
                this.value2 = promise2;
                this.value3 = promise3;
                this.value4 = promise4;
                this.value5 = promise5;
                this.value6 = promise6;
            }, (error) => {errorHandlingFunction}
   });

If I remove any single parameter so that it's passing six parameters to forkJoin, it works fine. So my question is, in my case where I want to load the object observable and subsequent array observables all in one call, is there another way to do this? Is this a bug with forkJoin since the official documentation says it should be able to accept any number of Observables?

I've tried creating an Array of type Observable and using array.forEach() inside the forkJoin but it complains about returning type void. That seemed like a janky way of doing it anyhow.

Brazen answered 28/3, 2018 at 19:34 Comment(2)
Why do you need to make so many simultaneous requests?Worldlywise
Each request is hitting a different API endpoint returning different data needed for a form that is used in this particular component. The drop downs in the form need to be populated with that data when the page loads, hence why it needs to be done simultaneously.Brazen
R
7

As the answer explains in the question you linked, the maximum number of arguments is only constrained by the type definitions -- not the runtime source itself. The type definitions are useful because they declare the array element types that will be produced for the next step of the observable stream (the types for [param, promise1, promise2, ...] in your case).

It sounds like strict type safety around the assignments in your subscription handler is what's causing the issue for you. Since you have more than 6 observables, it defaults the resulting parameters to a shared type which likely does not match the types of the fields that you are trying to assign.

There are a few ways around this. You can cast the arguments in your subscription handler or you can add your own types yourself. Casting arguments is the quick-and-dirty solution, but it causes you to lose type safety. Adding types yourself would let you maintain type safety, but it could also mean that you end up with any number of factory method declarations. Place the below in a type definition file (*.d.ts) anywhere in your project. I like to place type definitions like this in a typings/ directory at a sibling level to my app/ directory.

import { Observable, SubscribableOrPromise } from 'rxjs/Observable';

declare module 'rxjs/observable/ForkJoinObservable' {
    namespace ForkJoinObservable {
        export function create<T, T2, T3, T4, T5, T6, T7>(v1: SubscribableOrPromise<T>, v2: SubscribableOrPromise<T2>, v3: SubscribableOrPromise<T3>, v4: SubscribableOrPromise<T4>, v5: SubscribableOrPromise<T5>, v6: SubscribableOrPromise<T6>, v7: SubscribableOrPromise<T7>): Observable<[T, T2, T3, T4, T5, T6, T7]>;
        export function create<T, T2, T3, T4, T5, T6, T7, R>(v1: SubscribableOrPromise<T>, v2: SubscribableOrPromise<T2>, v3: SubscribableOrPromise<T3>, v4: SubscribableOrPromise<T4>, v5: SubscribableOrPromise<T5>, v6: SubscribableOrPromise<T6>, v7: SubscribableOrPromise<T7>, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6, v7: T7) => R): Observable<R>;
    }
}

This process is explained in more detail in the TypeScript documentation page for Declaration Merging.


Edit: It looks like I'm using an older version of RxJS and the structure has changed a bit since. The following should be closer to the type declarations that should work with the current structure:

declare module 'rxjs/Observable' {
    namespace Observable {
        export function forkJoin<T, T2, T3, T4, T5, T6, T7>(sources: [ObservableInput<T>, ObservableInput<T2>, ObservableInput<T3>, ObservableInput<T4>, ObservableInput<T5>, ObservableInput<T6>, ObservableInput<T7>]): Observable<[T, T2, T3, T4, T5, T6, T7]>;
        export function forkJoin<T, T2, T3, T4, T5, T6, T7>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, v7: ObservableInput<T7>): Observable<[T, T2, T3, T4, T5, T6, T7]>;
    }
}

I am basing these off of the current forkJoin type declarations.

As far as the module augmentation, the code above modifies the type declarations for the module defined by the absolute path 'rxjs/Observable'. This is the same as the import path that you would use when importing the Observable class. The module as defined by RxJS exports the Observable class and its fields. Our augmentation to that module modifies it by using the namespace block. This allows us to add type declarations under the Observable namespace (e.g., type declarations for Observable.myFunctionOrField) which looks the same as calling static functions on it. Effectively, this declares the additional Observable.forkJoin function possibilities.

Redraft answered 28/3, 2018 at 20:44 Comment(11)
Thank you for the help. I read through the documentation for Declaration Merging and couldn't quite put your code together with the examples they show for module augmentation. How exactly does the code you provided hook into the forkJoinObservable module? When I use your code as is in a new .d.ts file, I get a duplicate function error with the two create functions so I'm wondering how this hooks into the project in order to resolve those conflicts. I also thought I knew how to cast the arguments in the subscription handler but after trying on my own and doing some searching apparently I don't.Brazen
Would it be possible @MikeHill to provide an example of casting an argument in the subscription handler? I was trying to do something like this. .subscribe(([param<Param>, promise1<Promise1> ]) => {Brazen
You would have three options for casting: The original observable, the subscription parameters, and the assignment operations themselves. The observable would be something like this: (Observable.forkJoin(...) as Observable<[T, T2, T3, ...]>).subscribe(...). The subscription parameters would be: .subscribe(([param, promise1, ...]: [T, T2, ...]) => {...}). The assignments would be: this.job = param1 as T; this.value1 = promise1 as T2; .... Try each and see which suits your style preferences best. Your compilation strictness settings may restrict these options for you, as well.Redraft
It looks like I was on an older version of RxJS. I added type declarations for the latest version and tried to explain how that augmentation works. Please let me know if that clears things up for you.Redraft
Here is a reference on how to add RxJS operators that you might find helpful, too: blog.novatec-gmbh.de/adding-custom-operators-rxjs. Some things may have changed with the implementation specifics since the time that post was written, kind of like with my original answer, but the general details should be pretty similar. Note that linked example is for operators (i.e. functions executed on existing Observable objects, like obs$.map) rather than static methods (like Observable.map), though.Redraft
Fantastic! Thank you again so much for the thorough explanation, resources, and examples. This is what I love about programming. People who are able to share a wealth of information to help others learn new things.Brazen
It looks like you are missing the v7: in front of ObservableInput<T7> in your updated example.Nutritious
I also don't think this works with angular-cli and rxjs 6Nutritious
@Nutritious good call on the missing parameter. I've added that now. Regarding RxJS 6, I'm not sure what changes have been made that would affect this. My answer is based off of this version, including the use of Angular CLI, and the current version doesn't seem much different. Feel free to suggest an edit or add your own answer if this doesnt work for you though.Redraft
@MikeHill The core concept is the same but there were a couple things that needed to be changed in the code written to work with rxjs 6+. See my answer below.Nutritious
Is this still up to date? I cannot find a maximum of 6 params in the type definitionShibboleth
A
9

Starting from version 6.5 of rxJs you can use dictionary of values to put any number of observables inside forkJoin see the first example here

    forkJoin({first: of(1), second: of(2), third: of(3)})
             .subscribe(result => {console.log(result.first)});
Aristaeus answered 17/12, 2019 at 16:10 Comment(1)
This answer is far preferable to hacking the type definition file and IMHO is the more readable syntax even for fewer parameters.Missilery
R
7

As the answer explains in the question you linked, the maximum number of arguments is only constrained by the type definitions -- not the runtime source itself. The type definitions are useful because they declare the array element types that will be produced for the next step of the observable stream (the types for [param, promise1, promise2, ...] in your case).

It sounds like strict type safety around the assignments in your subscription handler is what's causing the issue for you. Since you have more than 6 observables, it defaults the resulting parameters to a shared type which likely does not match the types of the fields that you are trying to assign.

There are a few ways around this. You can cast the arguments in your subscription handler or you can add your own types yourself. Casting arguments is the quick-and-dirty solution, but it causes you to lose type safety. Adding types yourself would let you maintain type safety, but it could also mean that you end up with any number of factory method declarations. Place the below in a type definition file (*.d.ts) anywhere in your project. I like to place type definitions like this in a typings/ directory at a sibling level to my app/ directory.

import { Observable, SubscribableOrPromise } from 'rxjs/Observable';

declare module 'rxjs/observable/ForkJoinObservable' {
    namespace ForkJoinObservable {
        export function create<T, T2, T3, T4, T5, T6, T7>(v1: SubscribableOrPromise<T>, v2: SubscribableOrPromise<T2>, v3: SubscribableOrPromise<T3>, v4: SubscribableOrPromise<T4>, v5: SubscribableOrPromise<T5>, v6: SubscribableOrPromise<T6>, v7: SubscribableOrPromise<T7>): Observable<[T, T2, T3, T4, T5, T6, T7]>;
        export function create<T, T2, T3, T4, T5, T6, T7, R>(v1: SubscribableOrPromise<T>, v2: SubscribableOrPromise<T2>, v3: SubscribableOrPromise<T3>, v4: SubscribableOrPromise<T4>, v5: SubscribableOrPromise<T5>, v6: SubscribableOrPromise<T6>, v7: SubscribableOrPromise<T7>, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6, v7: T7) => R): Observable<R>;
    }
}

This process is explained in more detail in the TypeScript documentation page for Declaration Merging.


Edit: It looks like I'm using an older version of RxJS and the structure has changed a bit since. The following should be closer to the type declarations that should work with the current structure:

declare module 'rxjs/Observable' {
    namespace Observable {
        export function forkJoin<T, T2, T3, T4, T5, T6, T7>(sources: [ObservableInput<T>, ObservableInput<T2>, ObservableInput<T3>, ObservableInput<T4>, ObservableInput<T5>, ObservableInput<T6>, ObservableInput<T7>]): Observable<[T, T2, T3, T4, T5, T6, T7]>;
        export function forkJoin<T, T2, T3, T4, T5, T6, T7>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, v7: ObservableInput<T7>): Observable<[T, T2, T3, T4, T5, T6, T7]>;
    }
}

I am basing these off of the current forkJoin type declarations.

As far as the module augmentation, the code above modifies the type declarations for the module defined by the absolute path 'rxjs/Observable'. This is the same as the import path that you would use when importing the Observable class. The module as defined by RxJS exports the Observable class and its fields. Our augmentation to that module modifies it by using the namespace block. This allows us to add type declarations under the Observable namespace (e.g., type declarations for Observable.myFunctionOrField) which looks the same as calling static functions on it. Effectively, this declares the additional Observable.forkJoin function possibilities.

Redraft answered 28/3, 2018 at 20:44 Comment(11)
Thank you for the help. I read through the documentation for Declaration Merging and couldn't quite put your code together with the examples they show for module augmentation. How exactly does the code you provided hook into the forkJoinObservable module? When I use your code as is in a new .d.ts file, I get a duplicate function error with the two create functions so I'm wondering how this hooks into the project in order to resolve those conflicts. I also thought I knew how to cast the arguments in the subscription handler but after trying on my own and doing some searching apparently I don't.Brazen
Would it be possible @MikeHill to provide an example of casting an argument in the subscription handler? I was trying to do something like this. .subscribe(([param<Param>, promise1<Promise1> ]) => {Brazen
You would have three options for casting: The original observable, the subscription parameters, and the assignment operations themselves. The observable would be something like this: (Observable.forkJoin(...) as Observable<[T, T2, T3, ...]>).subscribe(...). The subscription parameters would be: .subscribe(([param, promise1, ...]: [T, T2, ...]) => {...}). The assignments would be: this.job = param1 as T; this.value1 = promise1 as T2; .... Try each and see which suits your style preferences best. Your compilation strictness settings may restrict these options for you, as well.Redraft
It looks like I was on an older version of RxJS. I added type declarations for the latest version and tried to explain how that augmentation works. Please let me know if that clears things up for you.Redraft
Here is a reference on how to add RxJS operators that you might find helpful, too: blog.novatec-gmbh.de/adding-custom-operators-rxjs. Some things may have changed with the implementation specifics since the time that post was written, kind of like with my original answer, but the general details should be pretty similar. Note that linked example is for operators (i.e. functions executed on existing Observable objects, like obs$.map) rather than static methods (like Observable.map), though.Redraft
Fantastic! Thank you again so much for the thorough explanation, resources, and examples. This is what I love about programming. People who are able to share a wealth of information to help others learn new things.Brazen
It looks like you are missing the v7: in front of ObservableInput<T7> in your updated example.Nutritious
I also don't think this works with angular-cli and rxjs 6Nutritious
@Nutritious good call on the missing parameter. I've added that now. Regarding RxJS 6, I'm not sure what changes have been made that would affect this. My answer is based off of this version, including the use of Angular CLI, and the current version doesn't seem much different. Feel free to suggest an edit or add your own answer if this doesnt work for you though.Redraft
@MikeHill The core concept is the same but there were a couple things that needed to be changed in the code written to work with rxjs 6+. See my answer below.Nutritious
Is this still up to date? I cannot find a maximum of 6 params in the type definitionShibboleth
N
5

Thanks @MikeHill for getting me pointed in the correct direction. The solution that ended up working for me was to add a typings.d.ts to the angular-cli generated src folder. I believe you can also use a typings folder in the same location but you will need to update your tsconfig.app.json file, see this article for more information. https://github.com/angular/angular-cli/blob/6449a753641340d8fc19a752e1a1ced75f974efa/docs/documentation/1-x/stories/third-party-lib.md

typings.d.ts for 7 parameters with forkJoin

import { ObservableInput, Observable } from 'rxjs';
import { forkJoin } from 'rxjs/internal/observable/forkJoin';

declare module 'rxjs/internal/observable/forkJoin' {
  export function forkJoin<T, T2, T3, T4, T5, T6, T7>(
    sources: [
      ObservableInput<T>,
      ObservableInput<T2>,
      ObservableInput<T3>,
      ObservableInput<T4>,
      ObservableInput<T5>,
      ObservableInput<T6>,
      ObservableInput<T7>
    ],
  ): Observable<[T, T2, T3, T4, T5, T6, T7]>;
  export function forkJoin<T, T2, T3, T4, T5, T6, T7>(
    v1: ObservableInput<T>,
    v2: ObservableInput<T2>,
    v3: ObservableInput<T3>,
    v4: ObservableInput<T4>,
    v5: ObservableInput<T5>,
    v6: ObservableInput<T6>,
    v7: ObservableInput<T7>,
  ): Observable<[T, T2, T3, T4, T5, T6, T7]>;
}
Nutritious answered 16/10, 2018 at 4:21 Comment(0)
Y
4

Fork joins can be nested into logical groups of six or less parameters. The following should work (but haven't tested):

ngOnInit() {
    this.spinner.show();
    Observable.forkJoin(
        this.loadParams(),
        Observable.forkJoin(
            this.service.getPromiseType1(),
            this.service.getPromiseType2(),
            this.service.getPromiseType3(),
        ),
        Observable.forkJoin(
            this.service.getPromiseType4(),
            this.service.getPromiseType5(),
            this.service.getPromiseType6(),
        )
    )
    .finally(() => this.spinner.hide())
    .subscribe(payloads => {
        [
            this.job,
            [
                this.value1,
                this.value2,
                this.value3,
            ],
            [
                this.value4,
                this.value5,
                this.value6,
            ],
        ] = payloads
    }, (error) => {
        errorHandlingFunction
    });
}
Yetac answered 29/11, 2018 at 10:16 Comment(0)
L
0

Are you sure about the 6 limit?

This example with 10 Observables used in forkJoins seems to work

const oArray = 
[ ... new Array(10).keys() ]
.map(n => Observable.of(n))
.reduce((obsArray, obs) => {
    obsArray.push(obs);
    return obsArray
}, new Array<Observable<number>>())

Observable
.forkJoin(oArray)
.subscribe(console.log, console.error, () => console.log('DONE'))

Plus, I do not understand the finally operator you use. I do not have it in in RxJs 5.5.2

Lankford answered 28/3, 2018 at 20:47 Comment(1)
It appears that it's only an issue because of the max number of arguments is only constrained by type definitions as @MikeHill has pointed out in another response. I'm going to attempt to create my own type definitions for this as he explained so that I can take in more types. As for the finally operator, it's a callback function that returns an Observable that calls a function when the source terminates gracefully on complete or error. RxJs finally DocumentationBrazen

© 2022 - 2025 — McMap. All rights reserved.