How to implement an async generator to stream with node Readable stream?
Asked Answered
C

2

5

I want to do something like this

const { Readable } = require("stream");

function generatorToStream(generator) {
  return new Readable({
    read() {
      (async () => {
        for await (const result of generator()) {
          if (result.done) {
            this.push(null);
          } else {
            this.push(result.value);
          }
        }
      })();
    }
  });
}

generatorToStream(async function*() {
  const msg1 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg1;
  const msg2 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg2;

  const msg3 = await new Promise(resolve =>
    setTimeout(() => resolve("ola amigao"), 2000)
  );
  yield msg3;
}).pipe(process.stdout);

but it's not working, the end event has never been called and i haven't received any data on my terminal.

Any solution or tips on how to implement it?

Clothesbasket answered 25/2, 2019 at 13:33 Comment(3)
Are you getting any errors, especially unhandled promise rejections? Try appending .catch(console.error) to your IIAFE.Plant
IIRC the result won't be an iteration record when you are using for await of, but the value itself. When the generator is done, the loop just ends.Plant
An async function with await called separately for each Promise is just like a generator. Is it even possible to combine them?Strang
S
5

Very basic working sample below:

import { Readable } from 'stream';

const generator = async function* () {
  yield 1;
  yield 2;
  yield 3;
};

const myReadableStream = Readable.from(generator());
myReadableStream.on('data', (data) => console.log(data));
Spradling answered 5/1 at 12:20 Comment(1)
Best answer in 2024. An external lib for this is not valuable anymore. From official documentation: nodejs.org/api/stream.html#streamreadablefromiterable-optionsAlysa
F
2

I'm the author of Scramjet, a functional stream processing framework that may be an easy solution for you.

If you're ok with adding a total of just 3 dependencies to your project then it couldn't be easier:

const {StringStream} = require("scramjet");

StringStream.from(async function* () {
    yield await something();
    ...
});

If you want to implement this on your own, take a look at the source code in DataStream line 112 - it should be fairly easy to implement. In general you'd need to implement something like this:

function generatorToStream(gen) {
    // obtain an iterator
    const iter = await gen();
    // create your output
    const out = new Passthrough();

    // this IIFE will do all the work
    (async () => {
        let done = false;
        for await (let chunk of iter) {
            // if write returns true, continue, otherwise wait until out is drained.
            if (!out.write(chunk)) await new Promise((res, rej) => this.once("drain", res);
        }
    })()
        // handle errors by pushing them to the stream for example
        .catch(e => out.emit('error', e));

    // return the output stream
    return out;
}

The above example is more or less what's happening in scramjet - there's a little more optimization there on keeping less event handlers and so on, but the above should work well in a simple case.

Fondue answered 27/2, 2019 at 12:52 Comment(1)
Kudos for disclaiming that you are the author of the lib before anything else, concealed self advertisement is very frustrating.Athapaskan

© 2022 - 2024 — McMap. All rights reserved.