Sometimes I'd like to use the RxJS operators to manipulate an endless asynchronous iterable without buffering the values. It is easy to turn an iterable into an Observable. Are there downsides in the following approach to turn an Observable into an asynchronous iterable?
const iterable = async function* (observable) {
let buffer = [],
resolve,
reject;
const subscription = observable.subscribe({
next: value => {
if (resolve) {
resolve(value);
resolve = reject = undefined;
} else {
buffer.push(Promise.resolve(value));
}
},
error: e => {
if (reject) {
reject(e);
resolve = reject = undefined;
}
},
complete: () => {},
});
while (!subscription.isStopped || buffer.length) {
yield buffer.shift() ||
new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
}
subscription.unsubscribe();
};
Here is an alternative implementation of the iterable as an Observer (without using a generator).
class IterableObserver {
constructor(observable) {
this.buffer = [];
this.resolve = undefined;
this.reject = undefined;
this.isStopped = false;
observable && observable.subscribe(this);
}
[Symbol.asyncIterator]() {
const t = this;
return {
next: async () => {
if (!t.isStopped || t.buffer.length) {
if (t.buffer.length) {
return {
value: t.buffer.shift(),
};
} else {
return new Promise((_resolve, _reject) => {
t.resolve = _resolve;
t.reject = _reject;
});
}
} else {
return { done: true };
}
},
};
}
next(value) {
if (this.resolve) {
this.resolve({ value });
this.resolve = this.reject = undefined;
} else {
this.buffer.push(value);
}
}
error(e) {
this.isStopped = true;
if (this.reject) {
this.reject(e);
this.resolve = this.reject = undefined;
}
}
complete() {
this.isStopped = true;
}
}
The benefit from this was questioned. Let's say you have an API which provides you with an asynchronous iterable of text file lines through the function makeTextFileLineIterator and you would need to provide an asynchronous iterable of the first N lines in uppercase for your client. How would you do that? With the RxJS operators and the iterable conversion it would be easy:
const lineIterator = makeTextFileLineIterator('https://cdn.skypack.dev/rxjs@^7.1.0/operators?min');
const upperCaseLines = new IterableObserver(from(lineIterator).pipe(
take(6),
map(line=>line.toLocaleUpperCase())),
);