RxJS: Producer-consumer with abort
Asked Answered
I

3

6

I've got a special producer consumer problem in RxJS: The producer slowly produces elements. A consumer is requesting elements and often has to wait for the producer. This can be achieved by zipping the producer and the request stream:

var produce = getProduceStream();
var request = getRequestStream();

var consume = Rx.Observable.zipArray(produce, request).pluck(0);

Sometimes a request gets aborted. A produced element should only consumed after a not aborted request:

produce:  -------------p1-------------------------p2--------->
request:  --r1--------------r2---------------r3-------------->
abort:    ------a(r1)------------------a(?)------------------>
consume:  ------------------c(p1, r2)-------------c(p2, r3)-->

The first request r1 would consume the first produced element p1, but r1 gets aborted by a(r1) before it can consume p1. p1 is produced and gets consumed c(p1, r2) on second request r2. The second abort a(?) is ignored, because no unanswered request happened before. The third request r3 has to wait on the next produced element p2 and is not aborted till p2 is produced. Thus, p2 is consumed c(p2, r3) immediately after it got produced.

How can I achieve this in RxJS?

Edit: I created an example with a QUnit test on jsbin. You can edit the function createConsume(produce, request, abort) to try/test your solution.

The example contains the function definition of the previously accepted answer.

Impost answered 26/2, 2015 at 17:22 Comment(3)
Nice problem. Here is my attempt, but @Brandon's is way better.Bartram
Btw. I noticed some small issues with your tests: 1. you produce 1, 2, but expect: p1, p2; 2. you once swapped expected and actual.Bartram
I'm sorry. I build this test in a hurry and shared the jsbin example the wrong way. Thus, I accidentally changed it later during my own approaches. I changed the elements to their original values: produce: p1 and p2; request: r1, r2 and r3; abort: a1.Impost
I
0

This solution ignores aborts that don't follow an unanswered request:

const {merge} = Rx.Observable;

Rx.Observable.prototype.wrapValue = function(wrapper) {
    wrapper = (wrapper || {});
    return this.map(function (value) {
        wrapper.value = value;
        return wrapper;
    });
};

function createConsume(produce, request, abort) {
  return merge(
            produce.wrapValue({type: 'produce'}),
            request.wrapValue({type: 'request'}),
            abort.wrapValue({type: 'abort'})
         )
         .scan(
            [false, []],
            ([isRequest, products], e) => {
                // if last time the request was answered
                if (isRequest && products.length) {
                    // remove consumed product
                    products.shift();
                    // mark request as answered
                    isRequest = false;
                }
                if (e.type === 'produce') {
                    // save product to consume later
                    products.push(e.value);
                } else {
                    // if evaluated to false, e.type === 'abort'
                    isRequest = (e.type === 'request');
                }
                return [isRequest, products];
            }
         )
         .filter( ([isRequest, products]) => (isRequest && products.length) )
         .map( ([isRequest, products]) => products[0] ); // consume
}

Code in newest test on JSBin.

Impost answered 12/3, 2015 at 9:54 Comment(0)
T
3

This (core idea minus details) passes your JSBin test:

var consume = request
  .zip(abort.merge(produce), (r,x) => [r,x])
  .filter(([r,x]) => isNotAbort(x))
  .map(([r,p]) => p);

And the JSBin code.

Tart answered 27/2, 2015 at 2:20 Comment(6)
Thank you for your solution, although it doesn't behave like I'd like to. I edited my question to be more clear what behaviour I'd like to have. I edit the diagram, added a descriptio and a link to a test example/environment on jsbin. I used your solution in that example. It fails because it skips the first produced element in favor of the second element, which gets consumed twice.Impost
Updated my answer. Does this solve all your cases? @MegaMuetzenMikeGotthelf
Clever. I knew there had to be a way to do it with existing operators.Crotty
It's essentially "request combined with the corresponding [abort or produce], ignoring when the result was an abort".Gotthelf
It is so terse and elegant.Bartram
In the meantime, I realized that it would be much easier to use if aborts are ignored if no unanswered request happened before.Impost
C
2

I can't quite wrap my brain around how to do it with existing operators. Here's how to do it with Observable.create():

return Rx.Observable.create(function (observer) {
  var rsub = new Rx.SingleAssignmentDisposable();
  var asub = new Rx.SingleAssignmentDisposable();
  var psub = new Rx.SingleAssignmentDisposable();
  var sub = new Rx.CompositeDisposable(rsub, asub, psub);
  var rq = [];
  var pq = [];
  var completeCount = 0;
  var complete = function () {
    if (++completeCount === 2) {
      observer.onCompleted();
    }
  };
  var consume = function () {
    if (pq.length && rq.length) {
      var p = pq.shift();
      var r = rq.shift();
      observer.onNext('p' + p);
    }
  };

  rsub.setDisposable(request.subscribe(
    function (r) {
      rq.push(r);
      consume();
    },
    function (e) { observer.onError(e); },
    complete));

  asub.setDisposable(abort.subscribe(
    function (a) {
      rq.shift();
    },
    function (e) { observer.onError(e); }
  ));

  psub.setDisposable(produce.subscribe(
    function (p) {
      pq.push(p);
      consume();
    },
    function (e) { observer.onError(e); },
    complete));


  return sub;
});

http://jsbin.com/zurepesijo/1/

Crotty answered 27/2, 2015 at 17:1 Comment(2)
Nice approach. Just would like to confirm: does calling sub.dispose() dispose all Rx.SingleAssignmentDisposables?Bartram
Yes CompositeDisposable lets you group multiple disposables into a single disposable.Crotty
I
0

This solution ignores aborts that don't follow an unanswered request:

const {merge} = Rx.Observable;

Rx.Observable.prototype.wrapValue = function(wrapper) {
    wrapper = (wrapper || {});
    return this.map(function (value) {
        wrapper.value = value;
        return wrapper;
    });
};

function createConsume(produce, request, abort) {
  return merge(
            produce.wrapValue({type: 'produce'}),
            request.wrapValue({type: 'request'}),
            abort.wrapValue({type: 'abort'})
         )
         .scan(
            [false, []],
            ([isRequest, products], e) => {
                // if last time the request was answered
                if (isRequest && products.length) {
                    // remove consumed product
                    products.shift();
                    // mark request as answered
                    isRequest = false;
                }
                if (e.type === 'produce') {
                    // save product to consume later
                    products.push(e.value);
                } else {
                    // if evaluated to false, e.type === 'abort'
                    isRequest = (e.type === 'request');
                }
                return [isRequest, products];
            }
         )
         .filter( ([isRequest, products]) => (isRequest && products.length) )
         .map( ([isRequest, products]) => products[0] ); // consume
}

Code in newest test on JSBin.

Impost answered 12/3, 2015 at 9:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.