ForkJoin 2 BehaviorSubjects
Asked Answered
T

3

17

I have two behaviour subject streams what I'm trying to forkJoin with no luck. As I imagined it gives back the two last values of it. Is this possible to implement it somehow?

It is not called after the subject.

let stream1 = new BehaviorSubject(2);
let stream2 = new BehaviorSubject('two');

Observable.forkJoin(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });
Tanika answered 27/9, 2016 at 10:36 Comment(0)
M
27

Maybe you want to use combineLatest instead of forkJoin, it's useful if you don't want to wait for a complete() call.

With combineLatest, when any source observable (in your case, your behavior subjects) emits a value, combineLatest will trigger:

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

combineLatest(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });

stream1.next(3);
stream2.next('three');

Console log:

(2) [2, "two"] // initial state

(2) [3, "two"] // next triggered on stream1

(2) [3, "three"] // next triggered on stream2

Live demo: https://stackblitz.com/edit/rxjs-qzxo3n

Mandorla answered 27/2, 2020 at 12:12 Comment(0)
O
24

Note what forkJoin() actually does from its documentation:

Wait for Observables to complete and then combine last values they emitted.

This means that forkJoin() emits a value when all input Observable are complete. When using BehaviorSubject this means explicitly calling complete() on both of them:

import { Observable, BehaviorSubject, forkJoin } from 'rxjs';

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

forkJoin(stream1, stream2)
  .subscribe(r => {
    console.log(r);
  });

stream1.complete();
stream2.complete();

See live demo: https://stackblitz.com/edit/rxjs-9nqtx6

March 2019: Updated for RxJS 6.

Obstreperous answered 27/9, 2016 at 14:18 Comment(1)
Thank you for your answer! Have you tested your code, because the log is not invoked in my computer?Tanika
C
5

You can use take(1) pipe or complete() method that mentioned above.

private subjectStream1 = new BehaviorSubject(null);
stream1$: Observable = this.subjectStream1.asObservable();

private subjectStream2 = new BehaviorSubject(null);
stream2$: Observable = this.subjectStream2.asObservable();

forkJoin({
  stream1: this.stream1$.pipe(take(1)),
  stream2: this.stream2$.pipe(take(1))
})
.pipe(takeUntil(this._destroyed$))
.subscribe(values) => console.log(values));
Coquille answered 13/5, 2021 at 10:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.