How to pass results between chained observables
Asked Answered
P

8

38

Abstract problem: Every time a source Observable emits and event, a sequence of API calls and Angular services need to be triggered. Some of those invocations are depending on previous results.

In my example, the source Observable startUpload$ triggers a series of depending invocations.

Using destructuring this can be written like this:

this.startUploadEvent$.pipe(
      concatMap(event => this.getAuthenticationHeaders(event)),
      map(({ event, headers }) => this.generateUploadId(event, headers)),
      tap(({ event, headers, id }) => this.emitUploadStartEvent(id, event)),
      concatMap(({ event, headers, id }) => this.createPdfDocument(event, headers, id)),
      concatMap(({ event, headers, id, pdfId }) => this.uploadBilderForPdf(event, pdfId, headers, id)),
      mergeMap(({ event, headers, id, pdfId, cloudId }) => this.closePdf(cloudId, event, headers, id, pdfId)),
      tap(({ event, headers, id, pdfId, cloudId }) => this.emitUploadDoneEvent(id, event, cloudId)),
).subscribe()

It almost reads like an imperative approach. But it has certain problems:

  • The destructuring chain is repeated over the code and gets longer and longer { event, headers, id, pdfId, cloudId }
  • Methods (like generateUploadId(event, headers)) are required to receive all previous values so that they are able to pass them to the next pipe, even if the method itself doesn't require it
  • Inner Observables (within the methods) are required to map the values so that further pipe stages can destruct them:

_

private closePdf(cloudId, event, headers, id, pdfId) {
    return this.httpClient.post(..., { headers } )
        .pipe(
             //...,
             map(() => ({ event, headers, id, pdfId, cloudId }))
        )
}

It would be nice if the compiler could take care of the boilerplate (like with async await) to write the code that reads like this (with none of the problems mentioned above):

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId
  }

How to pass results between chained observables without the problems i've mentioned? Is there a rxjs concept i've missed?

Pothouse answered 13/8, 2020 at 0:58 Comment(2)
why not use closure?Lochia
I think as well that closures might help you. Otherwise, you really have to map() results into arrays or object and then desctruct them. Another option could be using toPromise() for each Observable and then await each of themBaguio
C
29

You certainly shouldn't have your methods take in params that don't concern them!

To your main question:

How to pass results between chained observables without the problems i've mentioned?

Use a single scope (nested pipes)

The code below is equivalent to your sample code, with no need to pass the unnecessary properties. The previously returned values are accessible by function calls further down the chain:

1   startUploadEvent$.pipe(
2     concatMap(event => getAuthenticationHeaders(event).pipe(
3       map(headers => generateUploadId(event, headers).pipe(
4         tap(id => emitUploadStartEvent(id, event)),
5         concatMap(id => createPdfDocument(event, headers, id)),
6         concatMap(pdfId => uploadBilderForPdf(event, pdfId)),
7         tap(cloudId => closePdf(cloudId, event))
8       ))
9     ))
10  ).subscribe();

Notice how event and headers are accessible downstream. They do not need to be passed into functions that don't require them.

Is there a rxjs concept i've missed?

Maybe.? Not really... :-)

The trick is to tack on a .pipe to effectively group operators so they all have access to the input params.

Usually, we try to keep the code flat inside the .pipe:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`)),
3     map(response => response.data.userName),
4     map(name => `Hello ${name}!`),
5     tap(greeting => console.log(greeting))
6   );

but that code is really no different than:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`).pipe(
3       map(response => response.data.userName),
4       map(name => `Hello ${name}! (aka User #${id})`)
5     )),
6     tap(greeting => console.log(greeting))
7   );

But, in the second case, line #4 has access to the name and the id, whereas in the first case it only has access to name.

Notice the signature of the first is userId$.pipe(switchMap(), map(), map(), tap())

The second is: userId$.pipe(switchMap(), tap()).

Curtilage answered 18/8, 2020 at 6:26 Comment(3)
Reviewing the answers to date I think this is the best approach. It uses standard RxJs operators, the functions do not need pass through parameters and there is nothing out of the ordinary going on or required.Testate
The reason "nested pipes" allow access to name and id is because of closure which was what OP was asking for. :)Trudeau
@Trudeau I see it the opposite way! The closure works because of the nested pipe. From the “flat” example above, closures are used in the 4 different operators, and they don’t help achieve the goal. It’s only when you nest the pipe that the closure becomes useful. Nesting the pipe puts multiple operators in the same closure (scope), which is the heart of what makes this work.Curtilage
I
13

Your methods definitely shouldn't be coupled to the context as well as not to think about mapping result to the specific shape.

RxJS is all about functional programming. And in functional programming there is a pattern like Adapting Arguments to Parametersref

It allows us to decouple methods signature from context.

In order to achieve this you can write context depending version of map, contentMap, mergMap operators so that the final solution looks like:

this.startUploadEvent$.pipe(
      map(withKey('event')),
      concatMap_(({event}) => this.getAuthenticationHeaders(event), 'headers'),
      map_(({ headers }) => this.generateUploadId(headers), 'id'),
      tap(({ event, id }) => this.emitUploadStartEvent(id, event)),
      concatMap_(({ id }) => this.createPdfDocument(id), 'pdfId'),
      concatMap_(({ pdfId }) => this.uploadBuilderForPdf(pdfId), 'cloudId'),
      mergeMap_(({ cloudId }) => this.closePdf(cloudId)),
      tap(({id, event, cloudId}) => this.emitUploadDoneEvent(id, event, cloudId)),
    ).subscribe(console.log);

Note _ after those operators.

Stackblitz Example

The goal of those custom operators if to take parameters object go through projection function and add result of projection to the original parameters object.

function map_<K extends string, P, V>(project: (params: P) => V): OperatorFunction<P, P>;
function map_<K extends string, P, V>(project: (params: P) => V, key: K): OperatorFunction<P, P & Record<K, V>>;
function map_<K extends string, P, V>(project: (params: P) => V, key?: K): OperatorFunction<P, P> {
  return map(gatherParams(project, key));
}

function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return concatMap(gatherParamsOperator(projection, key));
}

function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return mergeMap(gatherParamsOperator(projection, key));
}

// https://github.com/Microsoft/TypeScript/wiki/FAQ#why-am-i-getting-supplied-parameters-do-not-match-any-signature-error
function gatherParams<K extends string, P, V>(fn: (params: P) => V): (params: P) => P;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key: K): (params: P) => P & Record<K, V>;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key?: K): (params: P) => P {
  return (params: P) => {
    if (typeof key === 'string') {
      return Object.assign({}, params, { [key]: fn(params) } as Record<K, V>);
    }

    return params;
  };
}

function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>): (params: P) => Observable<P>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key: K): (params: P) => Observable<P & Record<K, V>>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key?: K): (params: P) => Observable<P> {
  return (params: P) => {
    return fn(params).pipe(map(value => gatherParams((_: P) => value, key)(params)));
  };
}

function withKey<K extends string, V>(key: K): (value: V) => Record<K, V> {
  return (value: V) => ({ [key]: value } as Record<K, V>);
}

I used function overloads here because somethimes we don't need to add additional key to parameters. Parameters should only pass through it in case of this.closePdf(...) method.

As a result you're getting decoupled version of the same you had before with type safety:

enter image description here

Doesn't it look like over-engineering?

In most cases you should follow YAGNI(You aren't gonna need it) principle. And it would be better not to add more complexity to existing code. For such scenario you should stick to some simple implementation of sharing parameters between operators as follows:

ngOnInit() {
  const params: Partial<Params> = {};
  this.startUploadEvent$.pipe(
    concatMap(event => (params.event = event) && this.getAuthenticationHeaders(event)),
    map(headers => (params.headers = headers) && this.generateUploadId(headers)),
    tap(id => (params.uploadId = id) && this.emitUploadStartEvent(id, event)),
    concatMap(id => this.createPdfDocument(id)),
    concatMap(pdfId => (params.pdfId = pdfId) && this.uploadBuilderForPdf(pdfId)),
    mergeMap(cloudId => (params.cloudId = cloudId) && this.closePdf(cloudId)),
    tap(() => this.emitUploadDoneEvent(params.pdfId, params.cloudId, params.event)),
  ).subscribe(() => {
    console.log(params)
  });

where Params type is:

interface Params {
  event: any;
  headers: any;
  uploadId: any;
  pdfId: any;
  cloudId: any;
}

Please do note parentheses I used in assignments (params.cloudId = cloudId).

Stackblitz Example


There are also lots of other methods but they require to change your flow of using rxjs operators:

Ideogram answered 16/8, 2020 at 4:15 Comment(4)
This is in my opinion (although seems cool) a typical example of over engineering. Unless if it's used in many other places, it's better to avoid using it.Cowen
@Cowen Can't argue. That's definitely true. I added a simplified alternative solution.Ideogram
The simplified solution with param: Partial<Params> would result in TS errors with strictNullChecks: true compiler option.Elevenses
Thanks for the medium link, the article includes pretty good options including up- and downsidesAngelinaangeline
T
5

You can:

  • assign the result of each action to an observable

  • chain subsequent function calls based on earlier results

  • those results can be reused in later action calls via withLatestFrom

  • shareReplay is used to prevent the later withLatestFrom subscriptions causing earlier functions to re-execute

    function startUpload(event$: Observable<string>) {
      const headers$ = event$.pipe(
        concatMap(event => getAuthenticationHeaders(event)),
        shareReplay()
        );
    
      const id$ = headers$.pipe(
        map(() => generateUploadId()),
        shareReplay()
        );
    
      const emitUploadEvent$ = id$.pipe(
        withLatestFrom(event$),   // use earlier result
        map(([id, event]) => emitUploadStartEvent(id, event)),
        shareReplay()
        );
    
       // etc
    }
    

As above, the functions only take the parameters they require and there is no pass-through.

Demo: https://stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts

This pattern can be simplified by use of an rxjs custom operator(note this could be refined further, including typing):

function call<T, R, TArgs extends any[], OArgs extends Observable<any>[]>(
  operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,
  action: (...args: any[]) => R,
  ignoreInput: boolean,
  ...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> {
  return (input: Observable<T>) => input.pipe(
    withLatestFrom(...observableArgs),
    operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),
    shareReplay(1)
  );
}

Which can be used like:

function startUpload(event$: Observable<string>) {
  const headers$ = event$.pipe(
    call(concatMap, getAuthenticationHeaders, true)
  );

  const id$ = headers$.pipe(
    call(map, generateUploadId, false)
  );

  const startEmitted$ = id$.pipe(
    call(map, emitUploadStartEvent, true, event$)
  );

  const pdfId$ = startEmitted$.pipe(
    call(map, createPdfDocument, false, event$, headers$, id$)
  );

  const uploaded$ = pdfId$.pipe(
    call(map, uploadBuilderForPdf, false, event$, pdfId$, headers$, id$)
  );

  const cloudId$ = uploaded$.pipe(
    call(map, closePdf, false, headers$, pdfId$)
  );

  const uploadDone$ = cloudId$.pipe(
    call(map, emitUploadDoneEvent, true, id$, event$)
  );

  // return cloudId$ instead of uploadDone$ but preserve observable chain
  return uploadDone$.pipe(concatMap(() => cloudId$));    
}

Demo: https://stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts

Testate answered 16/8, 2020 at 11:51 Comment(0)
B
4

Could you use an object for the set of data? Something like this:

Interface:

export interface Packet {
  event: string;
  headers?: string;
  id?: number;
  pdfId?: number;
  cloudId?: number;
}

Then in the code, something like this:

Service:

  this.startUploadEvent$.pipe(
    concatMap(packet => this.doThingOne(packet)),
    map(packet => this.doThingTwo(packet)),
    tap(packet => this.doThingThree(packet)),
    // ...
  );

That way each method can use the bits of the object it needs and pass along the rest. Though this does require changing each of the methods to take in and work with the object.

Bethought answered 13/8, 2020 at 20:22 Comment(2)
This enhances the readability since it reduces the destructuring. But still every doThing method is receiving a parameter they don't need and return a value which isn't related to their method context. As an example: We woudln't design a method to calulate the age of a person like this private calcAge(person: Person): Person {...}. Instead we would design it like this private calcAge(birthdate: date): number {...}. Which is more generic, easy to test and not coupled to the context it is running in.Pothouse
Then, as stated above, nested pipes might be what you are looking for.Bethought
A
4

As far as I understood you, you are concerned about readability and not having to carry the payload from method to method.

Have you ever thought about converting an Observable to a Promise? The important thing here is that the observables must complete so that the promise is fulfilled and can be resolved (is the same as complete but only for promise).

Due to your advice, see above (like with async await) I came to this suggestion.

private async startUpload(event: StartUploadEvent) {
    const headers = await this.getAuthenticationHeaders(event).toPromise();
    const id = await this.generateUploadId().toPromise();
    
    this.emitUploadStartEvent(id, event);
    
    const pdfId = await this.createPdfDocument(event, headers, id).toPromise();
    await this.uploadBilderForPdf(event, pdfId, headers, id).toPromise();
    
    const cloudId = await this.closePdf(headers, pdfId).toPromise();
    this.emitUploadDoneEvent(id, event, cloudId)
    
    return cloudId
}

Info: Here you can read what happens if you convert an observable into a promise without having complete the observable: Why converted promise from Subject (Observable) does not work as expected

Note: I'm fulfilling your expectations according

And maybe there are other ways to solve the problem which are not violating common best practices

Aerolite answered 16/8, 2020 at 9:37 Comment(2)
This is what I had in mind with promises. For me this is the most readable and practical solution. But I have the feeling of missing something out, or not understanding the full concept of rxjs when falling back to (the boilerplate free :) ) promises.Pothouse
I was thinking of the same approach. In my humble opinion, if a common best practice makes your code hard to read or understand, or makes you pass around unnecessary arguments to functions that don't need them, then it is no longer a 'best practice' for your case. @serkan-sepahi 's solution seem simple and easy, and it is the way I would go for.Unsex
E
2

You are right about the problems such code produces and the abstract solution is to move the responsibility for combining the results and passing correct arguments to each call from the methods to the pipe.

A few improvements can be done very easily. tap operator does not modify the value, so you can remove unneeded properties from destructuring. map just transforms the result, so instead

map(({ event, headers }) => this.generateUploadId(event, headers)),

we can write

map(({ event, headers }) => ({
  event,
  headers,
  id: this.generateUploadId(event, headers)
}))

and this.generateUploadId does not have to return an object anymore.

As for high-order mapping operators, there are a few options that came to my mind. First of all, most of 'xMap' operators support result selector as the last argument and its purpose is exactly what we need - combining source value with the result. Result selectors were depricated so nested pipes are the current way to go, but let's take a look at how it could look like using result selector

Option 0. Result Selector (deprecated)

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event),
      (event, headers) => ({ event, headers }) // <-- Result Selector
    )
  );

Option 1. Nested Pipes (aka "use closures")

It looks very similar to Option 0 but event is kept in the closure instead of inner observable.

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event)
        .pipe(map(headers => ({ event, headers })))
    )
  );

Option 2. Custom Operator (Closures here as well)

It is possible to make a custom operator and get a syntax pretty similar to Result Selectors

function withResultSelector(operator, transformer) {
  let sourceValue;
  return pipe(
    tap(value => (sourceValue = value)),
    operator,
    map(value => transformer(sourceValue, value))
  );
}

Usage:

this.startUploadEvent$
  .pipe(
    withResultSelector(
      concatMap(event => this.getAuthenticationHeaders(event)),
      (event, headers) => ({ event, headers })
    )
  );

Going further it is possible to extract repetitive stuff and make everything more functional:

const mergeAs = propName => (a, b) => ({ ...a, [propName]: b });
const opAndMergeAs = (operator, propName) => withResultSelector(operator, mergeAs(propName));

this.startUploadEvent$
  .pipe(
    opAndMergeAs(concatMap(event => this.getAuthenticationHeaders(event)), "headers")
  );

It may be a bit cumbersome to write proper types for that but it is a different problem

Playground I used writing the answer.

Evenhanded answered 15/8, 2020 at 23:56 Comment(2)
It's sad that the resultSelector is deprecated, it helps to separate the actual processing and preparing the result for the next stages. It think in Option 1 is a map(...) missing within the second pipePothouse
Agree, resultSelector had a specific semantic and looked pretty natural but it should be possible to get used to nested pipes after some practice and changing way of thinking.Evenhanded
D
2

You're right about these concerns and problems you mentioned but the problem I see here is turning your mindset from an imperative approach to a Reactive/Functional approach but let us review the imperative code first

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId
}

Here you see the stuff is more clean that you have event that you can pass and get only what you want and pass it to the next functions and we want to move this code to the Reactive/Functional approach.

the main problem from my point of view is that you made your function lose the context they have for example getAuthenticationHeaders should not return the event at all it should only return headers and the same for other functions.

when dealing with RxJS(aka Reactive Approach) you kind of deal with these issues a lot and that's ok as it keeps the functional concepts applied and keeps your code more predictable as pure operators should only deal with data at the same pipeline which keeps everything pure and not leading for side effects which will lead to unpredictable code.

I think what are you looking for will be solved with nested pipes (this is the best solution from my opinion)

concatMap(event => this.getAuthenticationHeaders(event).pipe(
    map(headers => this.generateUploadId(event, headers).pipe())
))

and It is used heavily in some RxJS backend libraries like Marble.js

you can use approach which is similar to Result Selector:

concatMap(event => this.getAuthenticationHeaders(event).pipe(
  map(headers => ({ headers, event }))
)),

or the great other solutions that people suggested will make it work but the you'll still have the same problems you mention but with more clean/readable code.

You can also turn it to async/await approach but you'll lose the reactivity that RxJS provide to you.

what I can suggest is to try to read more about the reactive programming and how you move your mindset to that and I'll provide some links here which I see is very great to start with and to try some libraries that built on top of RxJS like CycleJS and I recommend to read about Functional Programming which will help a lot also from this great books Mostly adequate guide to FP (in javascript) & Composing Software.

I recommend this great Talk RxJS Recipes which will change your way of using RxJS.

Useful Resources:

Doridoria answered 19/8, 2020 at 6:43 Comment(0)
P
1

There are four methods shown in this answer. Each of them uses identical four subsequent streams (so each backend call may use of previous stream response).

(Some of them are already mentioned in answers above, this is just for the sake of comparison on the same inputs. This example assumes that the streams are http calls, with one emission - so the flattening operator can be mergeMap, concatMap or switchMap.)

Each method has pros and cons.

Common preparation code:

import { of, map, Observable, exhaustMap } from 'rxjs';

const a$ = of('a');
const b$ = of('b');
const c$ = of('c');
const d$ = of('d');

interface Result {
  a: string;
  b: string, 
  c: string;
  d: string;
} 

Nested pipe with map / Previosly known as Result Selector

This solution has two levels of nested pipes. So it might be little less readable and error prone.

function makeBeCalls(): Observable<Result> {
  return a$
      .pipe(
          exhaustMap(a =>
              b$.pipe(
                  map(b => ({b, a}))
              )
          ),
          exhaustMap(({b, a}) =>
              c$.pipe(
                  map(c => ({b, a, c}))
              )
          ),
          exhaustMap(({a, b, c}) =>
              d$.pipe(
                  map(d => ({a, b, c , d}))
              )
          )
      );
}

makeBeCalls().subscribe(v => console.log(v))

Using a shared variable to keep previous values

Here we need extra temporary variable to keep subseqent emissions - the stream makes side effect. But the solution seems shorter and more readable.

function makeBeCalls1(): Observable<Result> {
  const events: Result = { a: null, b: null, c: null, d: null };
  return a$
      .pipe(
          exhaustMap(a => (events.a = a) && b$),
          exhaustMap(b => (events.b = b) && c$),
          exhaustMap(c => (events.c = c) && d$),
          map(d => (events.d = d) && events) 
      );
}

Using nested pipe / closure scope

This solution nests each stream so in the it resembles the callback hell. I think that this solution is least readable and more error prone. It might be ok for two levels of pipes.

function makeBeCalls2(): Observable<Result> {
  return a$
      .pipe(
          exhaustMap(a => b$
              .pipe(
                  exhaustMap(b => c$
                      .pipe(
                          exhaustMap(c => d$
                              .pipe(
                                  map(d => ({a, b, c, d}))
                              )
                          )
                      )
                  )
              )
          )
      );
}

makeBeCalls2().subscribe(v => console.log(v))

And finally using Promises with async / await

I think that this is most readable and in case of http calls the best solution. Some people might prefer reactive ways though.

async function makeBeCalls3(): Promise<Result> {
  const a = await lastValueFrom(a$);
  const b = await lastValueFrom(b$);
  const c = await lastValueFrom(c$);
  const d = await lastValueFrom(d$);
  
  return {a, b, c, d}
}

makeBeCalls3().then(v => console.log(v))

Stackblitz.

Plinth answered 8/1, 2023 at 14:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.