How can I `await` on an Rx Observable?
Asked Answered
H

7

206

I'd like to be able to await on an observable, e.g.

const source = Rx.Observable.create(/* ... */)
//...
await source;

A naive attempt results in the await resolving immediately and not blocking execution

Edit: The pseudocode for my full intended use case is:

if (condition) {
  await observable;
}
// a bunch of other code

I understand that I can move the other code into another separate function and pass it into the subscribe callback, but I'm hoping to be able to avoid that.

Heger answered 9/12, 2015 at 22:40 Comment(1)
Can you not move the remaining code (which you want to wait for the source) into a .subscribe() method call?Pounds
Y
224

You have to pass a promise to await. Convert the observable's next event to a promise and await that.

if (condition) {
  await observable.first().toPromise();
}

Edit note: This answer originally used .take(1) but was changed to use .first() which avoids the issue of the Promise never resolving if the stream ends before a value comes through.

As of RxJS v8, toPromise will be removed. Instead, the above can be replaced with await firstValueFrom(observable)

Yoicks answered 9/12, 2015 at 23:25 Comment(14)
Instead of take(1) could you use await observable.first().toPromise();?Eudemon
@Eudemon That should come out the same.Yoicks
@Eudemon If there were no values on completion, first() will result in rejection, and take(1) will result in pending promise.Nonego
@estus: So, in general, it makes more sense to call first(), right?Mackinaw
@Eudemon @AgentME Actually you should NOT use either take(1) nor first()in cases like this. Since you are expecting exactly ONE event to happen you should use single() which will throw an exception if there is more than 1,while not throwing an exception when there is none. If there is more than one there is likely something wrong in your code / data model etc. If you do not use single you will end up arbitrarily picking the first item that returns without warning that there are more. You would have to take care in your predicate on upstream data source to always maintain the same order.Encomiast
@Encomiast Note that .single() waits until the source stream ends before it even outputs the value and ends, which may not work well in all situations that .first() or .take(1) would work.Yoicks
Don't forget the import: import 'rxjs/add/operator/first';Mispleading
Now that toPromise() is deprecated, how should we do this?Holliehollifield
@Holliehollifield I can't find anything to indicate toPromise() is deprecated. Where did you read that?Chemist
import { first } from 'rxjs/operators'; nowTupelo
@EstusFlask Actually take(1) will not yield a pending promise. It will yield a promise resolved with undefined.Hibbitts
Maybe it worked before, but now pipe is needed: observable.pipe(first()).toPromise().Chlorohydrin
Use the new ` firstValueFrom()` or lastValueFrom() instead of toPromise. Available in RxJS 7+. See : indepth.dev/rxjs-heads-up-topromise-is-being-deprecatedAuroraauroral
I've used just observable.toPromise() many times without issue. If you really only need the first one, then use observable.first().toPromise() - otherwise the plain implementation will take the last value after completionAuroraauroral
A
76

Use the new firstValueFrom() or lastValueFrom() instead of toPromise(), which as pointed out here, is deprecated starting in RxJS 7, and will be removed in RxJS 8.

import { firstValueFrom} from 'rxjs';
import { lastValueFrom } from 'rxjs';

this.myProp = await firstValueFrom(myObservable$);
this.myProp = await lastValueFrom(myObservable$);

This is available in RxJS 7+

See: https://indepth.dev/rxjs-heads-up-topromise-is-being-deprecated/

Auroraauroral answered 4/9, 2020 at 16:27 Comment(0)
N
31

It likely has to be

await observable.first().toPromise();

As it was noted in comments before, there is substantial difference between take(1) and first() operators when there is empty completed observable.

Observable.empty().first().toPromise() will result in rejection with EmptyError that can be handled accordingly, because there really was no value.

And Observable.empty().take(1).toPromise() will result in resolution with undefined value.

Nonego answered 30/5, 2017 at 9:24 Comment(2)
Actually take(1) will not yield a pending promise. It will yield a promise resolved with undefined.Hibbitts
Thanks for noticing, that's correct. I'm not sure why the post differed, possibly the behaviour changed at some point.Nonego
B
23

Edit:

.toPromise() is now deprecated in RxJS 7 (source: https://rxjs.dev/deprecations/to-promise)

New answer:

As a replacement to the deprecated toPromise() method, you should use one of the two built in static conversion functions firstValueFrom or lastValueFrom.

Example:

import { interval, lastValueFrom } from 'rxjs';
import { take } from 'rxjs/operators';
 
async function execute() {
  const source$ = interval(2000).pipe(take(10));
  const finalNumber = await lastValueFrom(source$);
  console.log(`The final number is ${finalNumber}`);
}
 
execute();
 
// Expected output:
// "The final number is 9"

Old answer:

If toPromise is deprecated for you, you can use .pipe(take(1)).toPromise but as you can see here it's not deprecated.

So please juste use toPromise (RxJs 6) as said:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

async/await example:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

Read more here.

Babiche answered 3/1, 2020 at 16:3 Comment(2)
toPromise is deprecated, and it's has been removed in RxJS 8. The docs in your link is outdated.Formality
what does Rx mean?Lewse
S
12

You will need to await a promise, so you will want to use toPromise(). See this for more details on toPromise().

Stronghold answered 9/12, 2015 at 23:52 Comment(0)
A
5

Using toPromise() is not recommended as it is getting depreciated in RxJs 7 onwards. You can use two new operators present in RxJs 7 lastValueFrom() and firstValueFrom(). More details can be found here

const result = await lastValueFrom(myObservable$);

Implementations in Beta version are available here:

Amphiprostyle answered 1/2, 2021 at 9:35 Comment(0)
D
0

I am using RxJS V 6.4.0, hence I should use deprecated one in V 7.x.x toPromise(). Inspired by other answers, here is what I did with toPromise()

import { first, ... } from 'rxjs/operators';

...

if (condition) {
  await observable$.pipe(first()).toPromise();
}

...

Note how I used last() inside a pipe(). Because on mine observable.first() does not works just like mentioned by macil

Hope this helps others who using RxJS V 6.x.x as I do :).

Thanks.

Detta answered 19/12, 2021 at 7:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.