switchMap distincted by property in RxJs
Asked Answered
H

3

5

Let's say, I have a stream of actions. Each action is assigned some id. Like this:

const actions$ = of({ id: 1 }, { id: 2 }, { id: 1 });

Now, for each action, I want to perform some logic in switchMap:

actions$.pipe(switchMap(a => /* some cancellable logic */)).subscribe(...);

The problem is that each emitted action cancels previous 'some cancellable logic'.

Is it possible to cancel 'some cancellable logic' based on action id, preferably an operator? Something like:

actions$.pipe(switchMapBy('id', a => /*some cancellable logic */)).subscribe(...)

Essentially, current behaviour with switchMap:
1. actions$ emits id #1. switchMap subscribes to nested observable.
2. actions$ emits id #2. switchMap unsubscribes from previous nested observable. Subscribes to new one.
3. actions$ emits id #1. switchMap again unsubscribes from previous nested observable. Subscribes to new one.

Expected behaviour:
1. actions$ emits id #1. switchMap subscribes to nested observable.
2. actions$ emits id #2. switchMap again subscribes to nested observable (this time with #2). And here's the difference, it doesn't cancel the one from #1.
3. actions$ emits id #1. switchMap unsubscribes from nested observable for #1. Subscribes again, for #1.

Halvah answered 6/7, 2019 at 20:5 Comment(7)
what you're after is fairly unclear, I'd try to add some concrete examples of the behavior you're looking for.Vodka
Edited. I'd better learn to draw marble diagrams.Halvah
much clearer now. seems like you're more after the mergeMap operatorVodka
I'm not sure. mergeMap won't cancel previous operation when action with id=1 is emitted for the second time.Halvah
you'd need some custom logic to cancel the other subscriptions, it won't do it automatically, trying to come up with a solution, mergeScan might be more appropriate for this case.Vodka
I've created custom operator stackblitz.com/edit/rxjs-wesdoj. Implementation is pretty messy, but it represents what I need.Halvah
answer posted, decided mergeMap was the appropriate operator. custom can work, but this seems simple enough when all is said and done.Vodka
V
8

this seems to be a use case for the mergeMap operator. The use case of switchMap is to only maintain one inner subscription and cancel previous ones, which is not what you're after here. You want multiple inner subscriptions and you want them to cancel when a new value of the same id comes through, so implement some custom logic to do that.

something along the lines of:

action$.pipe(
  mergeMap(val => {
    return (/* your transform logic here */)
              .pipe(takeUntil(action$.pipe(filter(a => a.id === val.id)))); // cancel it when the same id comes back through, put this operator at the correct point in the chain
  })
)

you can turn this into something resuable by writing a custom operator:

import { OperatorFunction, Observable, from } from 'rxjs';
import { takeUntil, filter, mergeMap } from 'rxjs/operators';

export function switchMapBy<T, R>(
  key: keyof T,
  mapFn: (val: T) => Observable<R> | Promise<R>
): OperatorFunction<T, R> {
  return input$ => input$.pipe(
    mergeMap(val => 
      from(mapFn(val)).pipe(
        takeUntil(input$.pipe(filter(i => i[key] === val[key])))
      )
    )
  );
}

and use it like:

action$.pipe(
  switchMapBy('id', (val) => /* your transform logic here */)
);

here's a blitz of it in action: https://stackblitz.com/edit/rxjs-x1g4vc?file=index.ts

Vodka answered 6/7, 2019 at 21:1 Comment(2)
Thanks! that's what I needed. I think I'll just hide it behind custom operator as it'll be used in couple of places.Halvah
i added an example of how to turn my solution into a resuable custom switchMapBy operatorVodka
M
0

use filter operation before switchMap to exclude canceled ids, like this

of({ id: 1 }, { id: 2 }, { id: 1 }).pipe(
   filter(id => ![1,2].includes(id)), // to exclude ids (1,2)
   switchMap(id => /*some cancellable logic */ )
).subscribe(...)
Matriculate answered 6/7, 2019 at 20:26 Comment(0)
K
0

bryan60 answer has an issue. When you add tap before switchMapBy you will see that sometimes it is called multiple times. To get rid of this problem I edited the operator.

bryan60 code:

import { OperatorFunction, Observable, of, interval, Subject } from 'rxjs';
import { takeUntil, filter, mergeMap, map, tap } from 'rxjs/operators';

function switchMapBy<T, R>(key: keyof T, mapFn: (val: T) => Observable<R>): OperatorFunction<T, R> {
  return input$ => input$.pipe(
    mergeMap(val => {
      return mapFn(val).pipe(
        takeUntil(input$.pipe(filter(i => i[key] === val[key])))
      )
    })
  );
}

const source = new Subject<{id: number}>();

const sample$ = of({id: 1}, {id: 2}, {id: 1});

source.pipe(
  tap((a) => console.log('test')),
  switchMapBy('id', (val) => interval(1000).pipe(map(v => ({val, v}))))
  ).subscribe(v => console.log(v));

sample$.subscribe(source);

In the console you will see "test " 6 times.

Edited operator:

function switchMapBy<T, R>(
  key: keyof T,
  mapFn: (val: T) => Observable<R>
): OperatorFunction<T, R> {
  const sub$ = new Subject();
  return (input$) =>
    input$.pipe(
      tap((value) => {
        sub$.next(value);
      }),
      mergeMap((val) => {
        return mapFn(val).pipe(
          takeUntil(sub$.pipe(filter((i) => i[key] === val[key])))
        );
      })
    );
}

If you use it, in the console you will see "test" log 3 times.

Khalsa answered 22/9, 2023 at 12:28 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.