Convert an Observable to an async generator
Asked Answered
L

2

6

I'm trying to use rxjs in conjunction with babeljs to create an async generator function that yields when next is called, throws when error is called, and finishes when complete is called. The problem I have with this is that I can't yield from a callback.

I can await a Promise to handle the return/throw requirement.

async function *getData( observable ) {
    await new Promise( ( resolve, reject ) => {
        observable.subscribe( {
            next( data ) {
                yield data; // can't yield here
            },
            error( err ) {
                reject( err );
            },
            complete() {
                resolve();
            }
        } );
    } );
}

( async function example() {
    for await( const data of getData( foo ) ) {
        console.log( 'data received' );
    }
    console.log( 'done' );
}() );

Is this possible?

Locally answered 22/5, 2017 at 22:38 Comment(0)
L
6

I asked the rubber duck, then I wrote the following code which does what I wanted:

function defer() {
    const properties = {},
        promise = new Promise( ( resolve, reject ) => {
            Object.assign( properties, { resolve, reject } );
        } );
        return Object.assign( promise, properties );
}

async function *getData( observable ) {
    let nextData = defer();
    const sub = observable.subscribe( {
        next( data ) {
            const n = nextData;
            nextData = defer();
            n.resolve( data );
        },
        error( err ) {
            nextData.reject( err );
        },
        complete() {
            const n = nextData;
            nextData = null;
            n.resolve();
        }
    } );
    try {
        for(;;) {
            const value = await nextData;
            if( !nextData ) break;
            yield value;
        }
    } finally {
        sub.unsubscribe();
    }
}
Locally answered 22/5, 2017 at 23:4 Comment(0)
C
1

I think a problem with this solution is that the observable could generate several values in one batch (without deferring). This is my proposal:

const defer = () => new Promise (resolve =>
    setTimeout (resolve, 0));

async function* getData (observable)
{
    let values = [];
    let error = null;
    let done = false;
    observable.subscribe (
        data => values.push (data),
        err => error = err,
        () => done = true);
    for (;;)
    {
        if (values.length)
        {
            for (const value of values)
                yield value;
            values = [];
        }
        if (error)
            throw error;
        if (done)
            return;
        await defer ();
    }
}
Corvette answered 31/5, 2018 at 4:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.