RxJS Promise Composition (passing data)
Asked Answered
B

1

18

I'm brand new to Rx and am finding it difficult to find documentation on composing promises such that data from the first promise is passed into the second and so on. Here's three very basic promises, the calculations on the data aren't important, just that something async has to be done using data from the previous promise.

 const p1 = () => Promise.resolve(1);
 const p2 = x => { const val = x + 1; return Promise.resolve(val); };
 const p3 = x => {
      const isEven = x => x % 2 === 0;
      return Promise.resolve(isEven(x));
 };

The traditional way to achieve the composition I'm talking about:

 pl().then(p2).then(p3).then(console.log);

My favorite implementation is Ramda's composeP and pipeP:

R.pipeP(p1, p2, p3, console.log)()

It seems likely Rx might be able to handle this kind of situation pretty fluently. However, the closest I've found so far is from the RxJS to async (library) comparison here https://github.com/Reactive-Extensions/RxJS/blob/master/doc/mapping/async/comparing.md:

 var Rx = require('rx'),
     fs = require('fs'),
     path = require('path');
 var file = path.join(__dirname, 'file.txt'),
     dest = path.join(__dirname, 'file1.txt'),
     exists = Rx.Observable.fromCallback(fs.exists),
     rename = Rx.Observable.fromNodeCallback(fs.rename),
     stat = Rx.Observable.fromNodeCallback(fs.stat);
 exists(file)
    .concatMap(function (flag) {
     return flag ?
         rename(file, dest) :
         Rx.Observable.throw(new Error('File does not exist.'));
    })
    .concatMap(function () {
        return stat(dest);
    })
   .forEach(
      function (fsStat) {
          console.log(JSON.stringify(fsStat));
      },
      function (err) {
          console.log(err);
      }
    );

concatMap seems promising, but the above code looks pretty horrific. I was also having trouble with my example because Rx.Observable.fromPromise(p1) won't work as it expects a promise itself, not a function, and Rx.Observable.defer(p1) doesn't seem to pass parameters like the example.

Thanks!

Similar question but without data passing: Chaining promises with RxJS

Bicameral answered 10/1, 2016 at 2:15 Comment(3)
do your promises have to be wrapped in a function?Chairborne
Only in that if you defined a promise inline outside of a Promise chain or observable with something like const p1 = new Promise((resolve, reject) => {}) it would begin evaluating immediately and couldn't receive data from the previously executed promise. Or am I wrong about the immediate evaluation?Bicameral
Maybe helpful - example of chaining promises using rxjs6 - https://mcmap.net/q/183329/-rxjs-sequence-equivalent-to-promise-thenLigate
C
26

I did not read all of it, but if you want to achieve the same as pl().then(p2).then(p3).then(console.log);, with p being function returning promises, you could do something like (example here)

Rx.Observable.fromPromise(p1())
             .flatMap(function(p1_result){return p2(p1_result);})
             .flatMap(function(p2_result){return p3(p2_result);})

Or the more symmetric :

 var chainedPromises$ = 
     Rx.Observable.just()
             .flatMap(p1)
             .flatMap(p2)
             .flatMap(p3);

Now if you want to execute sequentially callback wrapped through fromCallback or fromNodeCallback, you could do something like :

function rename (flag){
  return flag
          ? rename(file,dest).flatMap(return Rx.Observable.just(dest))
          : Rx.Observable.throw(new Error('File does not exist.'));
}

Rx.Observable.just(file)
             .flatMap(exists)
             .flatMap(rename)
             .flatMap(stat)

The latter code is untested, so keep me updated if that works. Last comment, this should work if at each point you only have one value produced (like a promise). If you would have several files instead of one, with flatMap you might get ordering issues (if order matters to you), so in that case, you could use concatMap as a replacement.

Chairborne answered 10/1, 2016 at 3:59 Comment(5)
I was somewhat hoping for just a slightly higher abstraction that would be something like flatMapAll(p1, p2, p3). Particularly helpful if a sequence of promises is generated via a map e.g. const ps = map((x) => promisedFsReadFileCurriedSoThatItDoesSomethingWithPreviousFileData(x), ['1.txt','2.txt','3.txt']); Rx.Observable.just().flatMapAll(...ps); (just pseudo code). But this is definitely a manageable solution and likely there's a way to do this with mapping over fromPromise or something. thanks!Bicameral
also didn't test the second code sample either, but the first ones work like a charmBicameral
you can do flatMapAll yourself. flatMapAll :: Rx.Observable -> [a ->a] -> Rx.Observable. flatMapAll = (source, fn_array) -> fn_array.reduce((acc, fn) -> acc.flatMap(fn), source). In js, Rx.Observable.prototype.flatMapAll = function (fn_array) {source = this; return ...}Chairborne
yep, was coming to something similar. here's what I wrote const flatMapAll = (...fns) => fns.reduce((acc, fn) => acc.flatMap(fn), Rx.Observable.just()); flatMapAll(p1, p2, p3).subscribe(console.log). thanks for the help. also, what language is the first example in? Looks pretty close to haskellBicameral
it is pseudo code, but yes sintax is inspired from Haskell, at least the type declaration.Chairborne

© 2022 - 2024 — McMap. All rights reserved.