How to compose transform Streams in node.js
Asked Answered
B

5

15

I have a csv parser implemented as a series of transform streams:

process.stdin
    .pipe(iconv.decodeStream('win1252'))
    .pipe(csv.parse())
    .pipe(buildObject())
    .pipe(process.stdout);

I'd like to abstract the parser (in its own module) and be able to do:

process.stdin.
    .pipe(parser)
    .pipe(process.stdout);

where parser is just the composition of the previously used transform streams.

If I do

var parser = iconv.decodeStream('win1252')
    .pipe(csv.parse())
    .pipe(buildObject());

then parser is set to the buildObject() stream and only this transformation stream receives the data.

If I do

var parser = iconv.decodeStream('win1252');
parser
    .pipe(csv.parse())
    .pipe(buildObject());

it doesn't work either, as .pipe(process.stdout) will be called on the 1st transform stream and the 2 others will be bypassed.

Any recommendation for an elegant composition of streams?

Buzzell answered 16/5, 2014 at 14:27 Comment(0)
B
13

Unfortunately, there is no built-in way to do that, but there is cool multipipe package. Use like this:

var multipipe = require('multipipe');

var parser = multipipe(iconv.decodeStream('win1252'), csv.parse(), buildObject());
Boorer answered 16/5, 2014 at 15:27 Comment(2)
Same. This is brilliant. Also, multipipe's source code is very short, so worth checking out the implementation for anyone who values a look under the hood.Gyrostabilizer
There is also pumpify which similarly allows you to compose multiple streams.Schistosomiasis
P
5

As of 2022, and nodejs v16, there is a new compose function in the stream module, that build a Duplex stream from a list of streams.

see : https://nodejs.org/api/stream.html#streamcomposestreams

works with .pipe() and async syntax.

Pathan answered 1/3, 2022 at 8:32 Comment(0)
R
4

I've been struggling with this issue (and some others!). I found highlandjs solved nearly all my problems. In this case their pipeline command did the trick:

var h = require('highland');
var parser = h.pipeline(iconv.decodeStream('win1252'), csv.parse(), buildObject());
Rockafellow answered 3/6, 2014 at 13:0 Comment(1)
I would've sold my first child for this answer. thank you!Retail
H
3

I think this can be done natively now.

const { PassThrough, Transform } = require('stream');

const compose = (...streams) => {
  const first = new PassThrough();
  const last = new PassThrough();
  const result = new Transform();

  [first, ...streams, last].reduce(
    (chain, stream) => (
      stream.on('error', (error) => result.emit('error', error)),
      chain.pipe(stream)
    ),
  );

  result._transform = (chunk, enc, cb) => {
    last.once('data', (chunk) => cb(null, chunk));
    first.push(chunk, enc);
  };

  result._flush = (cb) => {
    last.once('end', () => cb(null));
    first.push(null);
  };

  return result;
};
Holothurian answered 5/8, 2020 at 19:40 Comment(0)
D
0

While stream.compose is still experimental you can use to wrap all streams into one stream. The resulting stream is composable into other compositions at any position.

export const compose = (...streams) => {
  const first = streams[0]

  const last = pipeline(
    ...streams,
    function emptyErrorHandler() {}
  )

  const stream = new Transform({
    writableObjectMode: first.writableObjectMode,
    readableObjectMode: last.readableObjectMode,
    transform(chunk, enc, cb) {
      first.write(chunk, enc, cb)
    },
    flush(cb) {
      first.end()
      last.once('finish', cb)
    }
  })

  last.on('data', chunk => stream.emit('data', chunk))
  last.once('error', err => stream.emit('error', err))

  return stream;
}
Devaughn answered 14/7 at 16:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.