I got stuck, trying to implement combine
logic for a list of mixed iterables, i.e. I have a list of Iterable
+ Iterator
+ AsyncIterable
+ AsyncIterator
, for which I'm trying to combine them together, for the same output like with RXJS's combineLatestWith.
Link to the source, plus the same below (my docs for the operator):
(See link to the complete playground at the bottom)
function combineAsync<T>(iterable: AsyncIterable<T>, ...values: AnyIterable<T>[]): AsyncIterable<any[]> {
return {
[Symbol.asyncIterator](): AsyncIterator<T[]> {
const list: AnyIterator<any>[] = [
iterable[Symbol.asyncIterator](),
...values.map((v: any) => typeof v[Symbol.iterator] === 'function' ? v[Symbol.iterator]() :
(typeof v[Symbol.asyncIterator] === 'function' ? v[Symbol.asyncIterator]() : v))
];
const pending = new Promise(() => {
// forever-pending promise
});
let start: Promise<IteratorResult<any[]>>, finished: boolean, latest: any[] = new Array(list.length),
changed = false, finishedCount = 0, lastError: { err: any } | null;
return {
next(): Promise<IteratorResult<any>> {
if (!start) {
start = Promise.all(list.map(a => a.next())).then(all => {
const value = [];
for (let i = 0; i < all.length; i++) {
const m = all[i];
if (m.done) {
finished = true;
return m;
}
value.push(m.value);
}
latest = [...value];
return {value, done: false};
});
return start;
}
if (!finished) {
const getValues = () => list.map((a, index) => {
if (!a) {
return pending;
}
const p = a.next() as any;
const it = typeof p.then === 'function' ? p : Promise.resolve(p);
return it.then((v: any) => {
if (v.done) {
list[index] = null as any; // stop requesting values;
if (++finishedCount === list.length) {
return true; // the end;
}
return pending;
}
latest[index] = v.value;
changed = true;
}).catch((err: any) => {
lastError = lastError || {err};
});
});
return start
.then(() => {
if (lastError) {
const r = Promise.reject(lastError.err);
lastError = null;
return r;
}
if (changed) {
changed = false;
return {value: [...latest], done: false};
}
return Promise.race(getValues()).then(end => {
if (end) {
finished = true;
return {value: undefined, done: true};
}
changed = false;
return {value: [...latest], done: false};
});
});
}
return Promise.resolve({value: undefined, done: true});
}
};
}
};
}
So when I pass 3 parameters: p1, p2(8), p3(7)
, defined as below...
const p1 = [1, 2, 3, 4]; // converted to async iterable
const p2 = async function* evenNumbers(maxEven: number): AsyncIterableIterator<number> {
for (let i = 2; i <= maxEven; i += 2) {
yield new Promise<number>(resolve => {
setTimeout(() => resolve(i), 10);
});
}
};
const p3 = async function* oddNumbers(maxOdd: number): AsyncIterableIterator<number> {
for (let i = 1; i <= maxOdd; i += 2) {
yield new Promise<number>(resolve => {
setTimeout(() => resolve(i), 5);
});
}
};
...I was expecting to get something like this:
[1, 2, 1]
[2, 2, 1]
[3, 2, 1]
[4, 2, 1]
[4, 2, 3]
[4, 4, 3]
[4, 4, 5]
[4, 4, 7]
[4, 6, 7]
[4, 8, 7]
but instead, I'm getting the following:
[1, 2, 1]
[2, 2, 1]
[3, 2, 1]
[4, 2, 1]
I've spent hours debugging this asynchronous monster, but couldn't figure out how updates from async iterables fail to reach Promise.race
calls that follow.
Any help is much appreciated!
Here's the complete playground.
UPDATE
To demonstrate that the right values generally do exist in the code, here's the version with the main console commented out, and instead added in two other places in the main function.
getValues()
on every.next()
call after the first, which advances all iterators not just the one that won the last race – Geminathen
update thelatest
values. And the method logic is to spawn the latest combinations, not every possible logical combination. For the latter, I have the synchronous version of it, which does exactly that, but for asynchronous the logic is a little different, like in RXJS. – Cuculiformconsole.log('getting next value from iterator '+i)
before the.next()
call, and one in the.then()
callback? – Geminaconsole.log(latest); // LOGGING
– Cuculiformcombine
which is better known aszip
and works very differently (never repeating a value). – Gemina