Can RxJS be used in a pull-based way?
Asked Answered
C

3

10

The examples in the RxJS README seem to suggest we have to subscribe to a source. In other words: we wait for the source to send events. In that sense, sources seem to be push-based: the source decides when it creates new items.

This contrasts, however, with iterators, where strictly speaking new items need only be created when requested, i.e., when a call is made to next(). This is pull-based behavior, also known as lazy generation.

For instance, a stream could return all Wikipedia pages for prime numbers. The items are only generated when you ask for them, because generating all of them upfront is quite an investment, and maybe only 2 or 3 of them might be read anyway.

Can RxJS also have such pull-based behavior, so that new items are only generated when you ask for them?

The page on backpressure seems to indicate that this is not possible yet.

Centralia answered 11/11, 2015 at 22:47 Comment(2)
I-ll let the specialists answer, but just a quick note. Call to next does not imply that the new items are only created when necessary as you say, it just means that they are requested (and provided as iterators are synchronous) at that time. Typical example are iterators based on arrays. The array (and the values inside) already existed before you call the iterator to enumerate its values. Actually you can think of the array as a buffer holding the values, and your next operator acts as would a controlled Rx.Observable with request(1).Cistaceous
Exactly—I've rephrased the wording slightly to reflect this. However, I wouldn't say that iterators are typically based on arrays; the more interesting use cases are when the iterator is not array-based.Centralia
T
5

Short answer is no.

RxJS is designed for reactive applications so as you already mentioned if you need pull-based semantics you should be using an Iterator instead of an Observable. Observables are designed to be the push-based counterparts to the iterator, so they really occupy different spaces algorithmically speaking.

Obviously, I can't say this will never happen, because that is something the community will decide. But as far as I know 1) the semantics for this case just aren't that good and 2) this runs counter to the idea of reacting to data.

A pretty good synopsis can be found here. It is for Rx.Net but the concepts are similarly applicable to RxJS.

Tourbillion answered 12/11, 2015 at 1:14 Comment(1)
Thanks—however, one clarification here: Iterators are appropriate only for synchronous pull-based scenarios. Despite the comparison table, I would not call Observable the exact asynchronous counterpart of Iterable, since iterators are pull-based but the RxJS objects are push-based.Centralia
J
1

Controlled observable from the page you referenced can change a push observable to pull.

var controlled = source.controlled();

// this callback will only be invoked after controlled.request()
controlled.subscribe(n => {
  log("controlled: " + n);
  // do some work, then signal for next value
  setTimeout(() => controlled.request(1), 2500);
});

controlled.request(1);

A truly synchronous iterator is not possible, as it would block when the source was not emitting.

In the snippet below, the controlled subscriber only gets a single item when it signals, and it does not skip any values.

var output = document.getElementById("output");
var log = function(str) {
  output.value += "\n" + str;
  output.scrollTop = output.scrollHeight;
};

var source = Rx.Observable.timer(0, 1000);
source.subscribe(n => log("source: " + n));

var controlled = source.controlled();
// this callback will only be invoked after controlled.request()
controlled.subscribe(n => {
  log("controlled: " + n);
  // do some work, then signal for next value
  setTimeout(() => controlled.request(1), 2500);
});
controlled.request(1);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>

<body>
  <textarea id="output" style="width:150px; height: 150px"></textarea>
</body>
Jurdi answered 29/3, 2016 at 11:26 Comment(10)
But in this case, a buffer is building up in the source right, i.e., the source is still potentially generating much more items than the consumer can handle? Because I'm looking for a solution that only creates new items when requested.Centralia
A cold observable (hot vs cold observables) will not start pushing values until told to. If that doesn't fit what you're looking for, example code would help to clarify the question.Jurdi
It's interesting to know hot and cold observables, but it's still not really what I'm looking for. I've added an example to the question (but not code, as the question is to find out what such code would look like). Summarizing: I'm looking for a lazy stream, which only generates elements on demand. A cold observable can be "paused" at the beginning, but then flows all elements anyway.Centralia
If you are looking to directly control the generation of elements, then observable is not what you are looking for. The whole point of the Observer pattern is that the subscribers (observers) do not control the action of the publisher, they only listen. There are various ways to achieve async iterators, ES6 generators being the most obvious. A compiler like Babeljs can let you use generators in your code today.Jurdi
Indeed, if RxJS only offers Observer, then it's not what I'm looking for. What I need is an async version of ES6 generators. ES6 generators cannot do async though, as you can see from this schema: reactivex.io/intro.html In that sense, I find it misleading that they propose the Observer pattern as the async alternative of generators, which it is not, since on-demand is not possible.Centralia
You seem to be confusing iterators with generators. ES6 generators are the async version of iterators. For a generator function *gen() { yield 'a'; yield 'b'; }, the second line will not be evaluated unless you call next() twice on the generator returned by gen(). After the first yield, control is immediately handed back to the caller.Jurdi
That's not async; that's pausing execution by returning control earlier. Async is necessary when elements need time to generate, and you don't want to actively wait on them. For example, take a scenario with an iterator that returns the content of each page in Wikipedia. The iterator needs to perform a JSON request, give back control, and only when the content has been downloaded, return the page. If you call next 100 times in a row, you can't expect to receive 100 pages already, because they won't be ready yet. That's the kind of thing I want to do. (Wrote my own lib in the meantime BTW.)Centralia
This example is helpful to make the answer more specific. This is indeed a good application for RxJS, because the downloader can push the pages to you when each is ready. You don't want to pull the pages synchronously at your command, you want to receive them each when downloaded. The 'iterator' you mention in this example is really a stream/observable: wikiPageObservable.subscribe(callback). There are some great RxJS examples out there of this exact scenario.Jurdi
Thanks for your help. I really want to pull them though :-) The consumer might be much slower than the producer (for example, if it applies detailed processing to each page). In other words: I'm interested in these scenarios where the consumer is slower than the producer. The consumer wants to pull when both the consumer and producer are ready, so the producer must not create too much items. My main reason for asking this is because I have developed such a library, which I will release soon.Centralia
Got it. Like a work queue, where the producer watches to see that its output queue does not get too deep. Good luck.Jurdi
N
-1

I'm quite late to the party, but it's actually very simple to combine generators with observables. You can pull a value from a generator function by syncing it with a source observable:

const fib = fibonacci()
interval(500).pipe(
  map(() => fib.next())
)
  .subscribe(console.log)

Generator implementation for reference:

function* fibonacci() {
  let v1 = 1
  let v2 = 1
  while (true) {
    const res = v1
    v1 = v2
    v2 = v1 + res
    yield res
  }
}

Neddie answered 11/11, 2021 at 20:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.