Combine asynchronous iterables, via native promises
Asked Answered
C

1

6

I got stuck, trying to implement combine logic for a list of mixed iterables, i.e. I have a list of Iterable + Iterator + AsyncIterable + AsyncIterator, for which I'm trying to combine them together, for the same output like with RXJS's combineLatestWith.

Link to the source, plus the same below (my docs for the operator):

(See link to the complete playground at the bottom)

function combineAsync<T>(iterable: AsyncIterable<T>, ...values: AnyIterable<T>[]): AsyncIterable<any[]> {
    return {
        [Symbol.asyncIterator](): AsyncIterator<T[]> {
            const list: AnyIterator<any>[] = [
                iterable[Symbol.asyncIterator](),
                ...values.map((v: any) => typeof v[Symbol.iterator] === 'function' ? v[Symbol.iterator]() :
                    (typeof v[Symbol.asyncIterator] === 'function' ? v[Symbol.asyncIterator]() : v))
            ];
            const pending = new Promise(() => {
                // forever-pending promise
            });
            let start: Promise<IteratorResult<any[]>>, finished: boolean, latest: any[] = new Array(list.length),
                changed = false, finishedCount = 0, lastError: { err: any } | null;
            return {
                next(): Promise<IteratorResult<any>> {
                    if (!start) {
                        start = Promise.all(list.map(a => a.next())).then(all => {
                            const value = [];
                            for (let i = 0; i < all.length; i++) {
                                const m = all[i];
                                if (m.done) {
                                    finished = true;
                                    return m;
                                }
                                value.push(m.value);
                            }
                            latest = [...value];
                            return {value, done: false};
                        });
                        return start;
                    }
                    if (!finished) {
                        const getValues = () => list.map((a, index) => {
                            if (!a) {
                                return pending;
                            }
                            const p = a.next() as any;
                            const it = typeof p.then === 'function' ? p : Promise.resolve(p);
                            return it.then((v: any) => {
                                if (v.done) {
                                    list[index] = null as any; // stop requesting values;
                                    if (++finishedCount === list.length) {
                                        return true; // the end;
                                    }
                                    return pending;
                                }
                                latest[index] = v.value;
                                changed = true;
                            }).catch((err: any) => {
                                lastError = lastError || {err};
                            });
                        });
                        return start
                            .then(() => {
                                if (lastError) {
                                    const r = Promise.reject(lastError.err);
                                    lastError = null;
                                    return r;
                                }
                                if (changed) {
                                    changed = false;
                                    return {value: [...latest], done: false};
                                }
                                return Promise.race(getValues()).then(end => {
                                    if (end) {
                                        finished = true;
                                        return {value: undefined, done: true};
                                    }
                                    changed = false;
                                    return {value: [...latest], done: false};
                                });
                            });
                    }
                    return Promise.resolve({value: undefined, done: true});
                }
            };
        }
    };
}

So when I pass 3 parameters: p1, p2(8), p3(7), defined as below...

const p1 = [1, 2, 3, 4]; // converted to async iterable

const p2 = async function* evenNumbers(maxEven: number): AsyncIterableIterator<number> {
      for (let i = 2; i <= maxEven; i += 2) {
            yield new Promise<number>(resolve => {
                setTimeout(() => resolve(i), 10);
            });
        }
};

const p3 = async function* oddNumbers(maxOdd: number): AsyncIterableIterator<number> {
      for (let i = 1; i <= maxOdd; i += 2) {
           yield new Promise<number>(resolve => {
               setTimeout(() => resolve(i), 5);
           });
      }
};

...I was expecting to get something like this:

[1, 2, 1] 
[2, 2, 1] 
[3, 2, 1] 
[4, 2, 1] 
[4, 2, 3] 
[4, 4, 3] 
[4, 4, 5] 
[4, 4, 7] 
[4, 6, 7] 
[4, 8, 7] 

but instead, I'm getting the following:

[1, 2, 1] 
[2, 2, 1] 
[3, 2, 1] 
[4, 2, 1]

I've spent hours debugging this asynchronous monster, but couldn't figure out how updates from async iterables fail to reach Promise.race calls that follow.

Any help is much appreciated!

Here's the complete playground.

UPDATE

To demonstrate that the right values generally do exist in the code, here's the version with the main console commented out, and instead added in two other places in the main function.

Cuculiform answered 19/12, 2021 at 9:35 Comment(8)
You're calling getValues() on every .next() call after the first, which advances all iterators not just the one that won the last raceGemina
@Gemina That's why I have each then update the latest values. And the method logic is to spawn the latest combinations, not every possible logical combination. For the latter, I have the synchronous version of it, which does exactly that, but for asynchronous the logic is a little different, like in RXJS.Cuculiform
Yes, but in your expected result only one of the values changes at a time - in your code, they would change all three at once. Did you do some debugging by putting a console.log('getting next value from iterator '+i) before the .next() call, and one in the .then() callback?Gemina
@Gemina Just added an update with the modified version to show the right values are there - see console.log(latest); // LOGGINGCuculiform
@Gemina Do you think it's something major to fix the implementation to match the expectation? I appreciate your help with this!Cuculiform
Yes, I'd write this in a different way I think. Also I wonder whether it is a good idea to put it in the same module as that synchronous combine which is better known as zip and works very differently (never repeating a value).Gemina
@Gemina I have zip done, and documented, it's different, because it stops after one iterable stops (no combining), which keeps going, producing all possible combinations. Combine for synchronous was easy, I'm just struggling with the async one.Cuculiform
@Gemina This is still quite interesting, so here I've thrown in a bounty, in case you decide to take a crack at it ;)Cuculiform
B
0

Vitaly you made an interesting problem. :) It's pretty tricky to reuse promises already launched in Promise.race() but it is possible.

Error and rejection are not handled here, but if all ok, that code can be added later.

class CachedIterator<T>{
    protected lastValue: T | undefined; 
    protected lastValueFetched: boolean = false;

    public _done = false; 

    protected cachedIteration: Promise<() => IteratorResult<T>> | undefined;

    protected iterator: AsyncIterator<T>;
    constructor(iterable: AsyncIterable<T>, protected id?: string){
        const v = iterable as any;
        this.iterator = 
        (typeof v[Symbol.iterator] === 'function' ? v[Symbol.iterator]() :
        (typeof v[Symbol.asyncIterator] === 'function' ? v[Symbol.asyncIterator]() : v)) as AsyncIterator<T>

    }
     
    
    async next(): Promise<(() => IteratorResult<T>) | undefined>{
        if(this._done) return undefined;
        if(!this.cachedIteration){
            this.cachedIteration = this.iterator.next().then(
                (result)=> { 
                    return () => {
                        this.fetch(result);
                        return result;
                    }      
                }
            )
        } 
        return this.cachedIteration
    }
    async nextAndFetch() {
        const fetch = await this.next();
        if(fetch) fetch();
    }
    protected fetch(result: IteratorResult<T>){
        this.cachedIteration = undefined;

        this.lastValueFetched = true; 
        if(result.done){
            this._done = true;
            if (result.value !== undefined){
               this.lastValue = result.value; 
            }
            return;
        }
        this.lastValue = result.value;
        //console.log("AWAITED next Value:", iteration ) 
    }

    async last(): Promise<T> {
        if(!this.lastValueFetched){
            //console.log("no first value, request Next");
            await this.nextAndFetch();
        }
        return this.lastValue!;
    }
    done(){
        return this._done
    }
}



function combineAsync<T>(...values: AnyIterable<T>[]): AsyncIterable<any[]> {
    return {
        [Symbol.asyncIterator](): AsyncIterator<T[]> {
            let done = false;
            
            const list: CachedIterator<T>[] = 
                values.map((v: any, id) => new CachedIterator<T>(v, 'id' + id));
            //console.log("LIST", list);
            return {
                async next() {
                    let skipDoneIteration = true;
                    //FLAG for protection from ending iterations;
                    while(skipDoneIteration){

                        if( list.every( f => f.done() ) ){
                            return { done: true, value: undefined }
                        }
                        /* RACE is the main problem here and it's obligatory
                        we launch promises for race and one of them will be cached
                        and other will end someday, maybe before we run next RACE

                        so we need to separate getting async iteration results and 
                        fetching: drop iteration cache, converting iteratorResult 
                        to last() value  

                        so each iteration of combineAsync must have resulted with one fetch

                        */
                        skipDoneIteration = false;
                        const result = await Promise.race( 
                            list.filter(a => !a.done() ).map( a => a.next() )
                        ).then( fetch => {
                            if(fetch){
                                return fetch();
                            }
                            return undefined;
                        });
                        if(result){
                            skipDoneIteration = !!result.done;
                            /*
                             another problem is final iterations with response { done: true, value: undefined }
                             we must skip them; 
                            */
                        } 
                    }

                    return Promise.all( list.map(a => a.last())).then(values => {
                        return {value: values, done: false}
                    });
                    
                }
            }
        }
    }
}   

Playground Link

Playground previous Link

Bursar answered 23/12, 2021 at 13:48 Comment(5)
Interesting :) I will need to go through first. All I know for now, the implementation absolutely requires the use of Promise.race, because we do have an async race logic there.Cuculiform
updated code, try to reduce layers: next() fetch() last() to next() + last(), but i cant. It seams that each iteration of combineAsync must have at least one Promise.Race + fetchBursar
Output from the updated playground doesn't looks right, it spits out a lot of undefined values.Cuculiform
updated code - fixedBursar
Gave you the bounty, for trying, but I'm still hoping to see more answers before accepting it :)Cuculiform

© 2022 - 2024 — McMap. All rights reserved.