Turning RxJS Observable into an asynchronous iterable
Asked Answered
P

0

6

Sometimes I'd like to use the RxJS operators to manipulate an endless asynchronous iterable without buffering the values. It is easy to turn an iterable into an Observable. Are there downsides in the following approach to turn an Observable into an asynchronous iterable?

const iterable = async function* (observable) {
  let buffer = [],
    resolve,
    reject;
  const subscription = observable.subscribe({
    next: value => {
      if (resolve) {
        resolve(value);
        resolve = reject = undefined;
      } else {
        buffer.push(Promise.resolve(value));
      }
    },
    error: e => {
      if (reject) {
        reject(e);
        resolve = reject = undefined;
      }
    },
    complete: () => {},
  });
  while (!subscription.isStopped || buffer.length) {
    yield buffer.shift() ||
      new Promise((_resolve, _reject) => {
        resolve = _resolve;
        reject = _reject;
      });
  }
  subscription.unsubscribe();
};

Codepen demo


Here is an alternative implementation of the iterable as an Observer (without using a generator).

class IterableObserver {
  constructor(observable) {
    this.buffer = [];
    this.resolve = undefined;
    this.reject = undefined;
    this.isStopped = false;
    observable && observable.subscribe(this);
  }

  [Symbol.asyncIterator]() {
    const t = this;
    return {
      next: async () => {
        if (!t.isStopped || t.buffer.length) {
          if (t.buffer.length) {
            return {
              value: t.buffer.shift(),
            };
          } else {
            return new Promise((_resolve, _reject) => {
              t.resolve = _resolve;
              t.reject = _reject;
            });
          }
        } else {
          return { done: true };
        }
      },
    };
  }

  next(value) {
    if (this.resolve) {
      this.resolve({ value });
      this.resolve = this.reject = undefined;
    } else {
      this.buffer.push(value);
    }
  }
  error(e) {
    this.isStopped = true;
    if (this.reject) {
      this.reject(e);
      this.resolve = this.reject = undefined;
    }
  }
  complete() {
    this.isStopped = true;
  }
}

The benefit from this was questioned. Let's say you have an API which provides you with an asynchronous iterable of text file lines through the function makeTextFileLineIterator and you would need to provide an asynchronous iterable of the first N lines in uppercase for your client. How would you do that? With the RxJS operators and the iterable conversion it would be easy:

const lineIterator = makeTextFileLineIterator('https://cdn.skypack.dev/rxjs@^7.1.0/operators?min');

const upperCaseLines = new IterableObserver(from(lineIterator).pipe(
  take(6),
  map(line=>line.toLocaleUpperCase())),
);

Codepen demo

Pascale answered 5/7, 2021 at 10:48 Comment(11)
What is the purpose to mix-up generators, observables and promises. There are tree different async abstractions. In your demo code you are switching from Observable, to generator, to promise. As a common approach I would suggest to stick just to one abstraction. It would increase readability, maintainability, etc. For example, say we stick to RxJs and add processor as part of pipe: ``` fromEvent(document, 'DOMContentLoaded') .pipe( first(), switchMap(interval(300)), take(40) // ... rest operators ).subscribe(res => console.log(res)) ```Teutonism
@Teutonism Just to borrow the operators from RxJS to use them with asynchronous iterables. You see, someone's found a reasoning to create an observable from an iterable, too.Pascale
Do you mean to wrap observable as generator?Teutonism
@Teutonism I mean to wrap observable into an asynchronous iterable. Generators create iterables based on arguments.Pascale
can't see too much benefits from doing so. It is already quite rare that we need generator in our code. Maybe you can show us the usage of it and ask for alternative patternParrakeet
Generator is just a thing that produces the iterable. We could just as well define it as a regular function but that would complicate the code. The generator syntax is specially designed for creating iterables.Pascale
@Pascale What is benefit of asynchronous iterable instead of already exist observable? The observable can cover any of your async scenario.Teutonism
@Pascale You should raise it as a feature request against RXJS, to add operator that takes an observable and returns an async iterable.Isabelleisac
@Teutonism Here are some examples of the usage of asynchronous iterators: blog.risingstack.com/async-iterators-in-node-jsPascale
Related: #44123646 #55012520Pascale
@Pascale If you want to process an iterable directly, without converting into any synthetic type (like Observable), check out iter-ops, it offers a sufficient set of processing operators.Isabelleisac

© 2022 - 2024 — McMap. All rights reserved.