Creating a Node.js stream from two piped streams
Asked Answered
C

3

20

I'd like to combine two Node.js streams into one by piping them, if possible. I'm using Transform streams.

In other words, I'd like my library to return myStream for people to use. For example they could write:

process.stdin.pipe(myStream).pipe(process.stdout);

And internally I'm using a third-party vendorStream that does some work, plugged into my own logic contained in myInternalStream. So what's above would translate to:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);

Can I do something like that? I've tried var myStream = vendorStream.pipe(myInternalStream) but that obviously doesn't work.

To make an analogy with bash, let's say I want to write a program that checks if the letter h is present in the last line of some stream (tail -n 1 | grep h), I can create a shell script:

# myscript.sh
tail -n 1 | grep h

And then if people do:

$ printf "abc\ndef\nghi" | . myscript.sh

It just works.

This is what I have so far:

// Combine a pipe of two streams into one stream

var util = require('util')
  , Transform = require('stream').Transform;

var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
  chunks1.push(chunk.toString());
  var pieces = (soFar + chunk).split('\n');
  soFar = pieces.pop();
  for (var i = 0; i < pieces.length; i++) {
    var piece = pieces[i];
    this.push(piece);
  }
  return done();
};

var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
  chunks2.push(chunk.toString());
  count = count + 1;
  this.push(count + ' ' + chunk.toString() + '\n');
  done();
};

var stdin = process.stdin;
var stdout = process.stdout;

process.on('exit', function () {
    console.error('chunks1: ' + JSON.stringify(chunks1));
    console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);


// stdin.pipe(stream1).pipe(stream2).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

// Best working solution I could find
var stream3 = function(src) {
  return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

Is this at all possible? Let me know if what I'm trying to do isn't clear.

Thanks!

Cracker answered 4/7, 2013 at 13:32 Comment(0)
T
30

You can watch for something to be piped to your stream, and then unpipe it and pipe it to the streams you're interested in:

var PassThrough = require('stream').PassThrough;

var stream3 = new PassThrough();

// When a source stream is piped to us, undo that pipe, and save
// off the source stream piped into our internally managed streams.
stream3.on('pipe', function(source) {
  source.unpipe(this);
  this.transformStream = source.pipe(stream1).pipe(stream2);
});

// When we're piped to another stream, instead pipe our internal
// transform stream to that destination.
stream3.pipe = function(destination, options) {
  return this.transformStream.pipe(destination, options);
};

stdin.pipe(stream3).pipe(stdout);

You can extract this functionality into your own constructable stream class:

var util = require('util');
var PassThrough = require('stream').PassThrough;

var StreamCombiner = function() {
  this.streams = Array.prototype.slice.apply(arguments);

  this.on('pipe', function(source) {
    source.unpipe(this);
    for(i in this.streams) {
      source = source.pipe(this.streams[i]);
    }
    this.transformStream = source;
  });
};

util.inherits(StreamCombiner, PassThrough);

StreamCombiner.prototype.pipe = function(dest, options) {
  return this.transformStream.pipe(dest, options);
};

var stream3 = new StreamCombiner(stream1, stream2);
stdin.pipe(stream3).pipe(stdout);
Technic answered 4/7, 2013 at 18:48 Comment(6)
Thanks a lot @brandon, this is awesome! Updated my Gist gist.github.com/nicolashery/5910969Cracker
This is awesome. I was thinking about doing something similar but I just didn't have confidence i wasn't missing some subtlety that would make my solution wrong. Thanks for the confidenceSpirit
FWIW, for this solution to work, you're required to pipe stream3 to a source (stdin in this case) before piping it to stdout. So, no stream3.pipe(stdout); stream3.write(data); But this is a great help! Thank you!Isidore
Turns out, stream3 is a transform stream, so it doesn't have a write method, anyway.Isidore
Won't this break when you pipe multiple streams into the same StreamCombiner object? I think this.transformStream should be created once in the constructor, not be overwritten on each pipe event.Frohne
Is there any library providing that functionality?Thevenot
C
4

One option is perhaps to use multipipe which lets you chain multiple transforms together, wrapped as a single transform stream:

// my-stream.js
var multipipe = require('multipipe');

module.exports = function createMyStream() {
  return multipipe(vendorStream, myInternalStream);
};

Then you can do:

var createMyStream = require('./my-stream');

var myStream = createMyStream();

process.stdin.pipe(myStream).pipe(process.stdout);

Clarification: This makes stdin go through vendorStream, then myInternalStream and finally stdout.

Calliper answered 5/8, 2016 at 10:35 Comment(1)
You can apparently also use npmjs.com/package/lazypipe with a slightly different apiCalliper
O
0

There's now a built-in function called stream.compose to achieve this: (currently marked as Stability 1: experimental, though it has been in node.js since version 16.9.0)

const stream = require('stream');
const myStream = stream.compose(vendorStream, myInternalStream);
process.stdin.pipe(myStream).pipe(process.stdout);

Combines two or more streams into a Duplex stream that writes to the first stream and reads from the last. [...]

Because stream.compose returns a new stream that in turn can (and should) be piped into other streams, it enables composition. In contrast, when passing streams to stream.pipeline, typically the first stream is a readable stream and the last a writable stream, forming a closed circuit.

Ovation answered 25/2 at 19:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.