Concatenate two (or n) streams
Asked Answered
S

11

39
  • 2 streams:

    Given readable streams stream1 and stream2, what's an idiomatic (concise) way to get a stream containing stream1 and stream2 concatenated?

    I cannot do stream1.pipe(outStream); stream2.pipe(outStream), because then the stream contents are jumbled together.

  • n streams:

    Given an EventEmitter that emits an indeterminate number of streams, e.g.

    eventEmitter.emit('stream', stream1)
    eventEmitter.emit('stream', stream2)
    eventEmitter.emit('stream', stream3)
    ...
    eventEmitter.emit('end')
    

    what's an idiomatic (concise) way to get a stream with all streams concatenated together?

Sass answered 8/5, 2013 at 1:21 Comment(0)
S
20

The combined-stream package concatenates streams. Example from the README:

var CombinedStream = require('combined-stream');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));

I believe you have to append all streams at once. If the queue runs empty, the combinedStream automatically ends. See issue #5.

The stream-stream library is an alternative that has an explicit .end, but it's much less popular and presumably not as well-tested. It uses the streams2 API of Node 0.10 (see this discussion).

Sass answered 8/5, 2013 at 19:8 Comment(2)
The combined-stream package already supports adding source streams in callback function, so you don't have to initiate them on the start which helps saving up memory, file descriptors and etc. Also, there is much more popular library multistream which seems more testedTraitor
Does that work with audio (pcm/opus) streams?Wrath
A
35

This can be done with vanilla Node.js

import { PassThrough } from 'stream'
const merge = (...streams) => {
    let pass = new PassThrough()
    for (let stream of streams) {
        const end = stream == streams.at(-1);
        pass = stream.pipe(pass, { end })
    }
    return pass
}

Use streams.slice(-1)[0] if you don't have .at() in your version of Node.js

Arian answered 8/11, 2016 at 8:51 Comment(5)
what if one stream never ends, but the other one doesIgnoble
Just a update pass.emit('end') not working. Try pass.end() Azores
Change --waiting to waiting--Blankbook
@TomLarkworthy Is that a response to @PirateApp? If not then I don't see why one should do so as this causes the last stream to never endEpiphenomenalism
This solution is nice but didn't keep the order of streams in my usage for some odd reason. Calling merge(a, b) produced a stream in which b preceded a. Could it be related to the fact that b is a stream of far less items compared to a, and ends first?Epiphenomenalism
S
20

The combined-stream package concatenates streams. Example from the README:

var CombinedStream = require('combined-stream');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));

I believe you have to append all streams at once. If the queue runs empty, the combinedStream automatically ends. See issue #5.

The stream-stream library is an alternative that has an explicit .end, but it's much less popular and presumably not as well-tested. It uses the streams2 API of Node 0.10 (see this discussion).

Sass answered 8/5, 2013 at 19:8 Comment(2)
The combined-stream package already supports adding source streams in callback function, so you don't have to initiate them on the start which helps saving up memory, file descriptors and etc. Also, there is much more popular library multistream which seems more testedTraitor
Does that work with audio (pcm/opus) streams?Wrath
R
16

This can now be easily done using async iterators

async function* concatStreams(readables) {
  for (const readable of readables) {
    for await (const chunk of readable) { yield chunk }
  }
} 

And you can use it like this

const fs = require('fs')
const stream = require('stream')

const files = ['file1.txt', 'file2.txt', 'file3.txt'] 
const iterable = await concatStreams(files.map(f => fs.createReadStream(f)))

// convert the async iterable to a readable stream
const mergedStream = stream.Readable.from(iterable)

More info regarding async iterators: https://2ality.com/2019/11/nodejs-streams-async-iteration.html

Roryros answered 1/6, 2020 at 17:2 Comment(5)
The stream returned from createReadStream is not iterable.Impetuosity
Do you mean mergedStream? because I can iterate it without any problems gist.github.com/ducaale/5e3fd00a70487c98333e5fb42bc4b624Roryros
If order is not required, ss it possible to await all so it runs in in parallel and thus faster?Kingsly
Sure, you can manually get the next item from the two async-iterables via await readable.next() and take the one that resolves first.Roryros
this option is the best on my opinion, but to typescript implementation must add ... on signature like this ts async function* concatStreams(...readables) { Dwaindwaine
I
8

If you don't care about the ordering of data in the streams, a simple reduce operation should be fine in nodejs!

const {PassThrough} = require('stream')

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
  s.pipe(pt, {end: false})
  s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
  return pt
}, new PassThrough())

Cheers ;)

Intwine answered 22/11, 2016 at 20:26 Comment(7)
Shouldn't you be returning something from reduce? This looks like joined will be undefined.Christian
WARNING: This will cause all the streams to pipe to the PassThrough stream in parallel, without any regards toward the ordering of the data, more than likely corrupting your data.Dordrecht
@LeonLi which is indeed the purpose of this approach. If you want to preserve the order you might pass an initial value different than PassThrough to your reduce function ;)Intwine
@LeonLi If you're wondering when you might not care about data ordering, I'm using this to pipe records (ie. objects) to a data store. Each record is indexed/ordered by an internal field, so I don't really care about insertion order.Artistry
@Ivo This question asks about concatenation. Most readers arriving in this QA would therefore care about ordering. This answer silently misleads those readers because the streams successfully get passed through, but unless you check the output you'll never know that it also jumbled all of your data (which the question specifically asks to avoid in the first place!). I urge you to add this information to the answer body.Dordrecht
No such thing as stream.ended. You'll have to set s.ended = true inside the end event handler.Overdone
Since writable streams don't have an 'end' event, I assume this is only meant to work for readable streams. As @Overdone said, s.ended doesn't exist. Replacing s.ended with s.readableEnded worked for me. Docs: nodejs.org/api/stream.htmlDeliciadelicious
C
5

In vanilla nodejs using ECMA 15+ and combining the good answers of Ivo and Feng.

The PassThrough class is a trivial Transform stream which does not modify the stream in any way.

const { PassThrough } = require('stream');

const concatStreams = (streamArray, streamCounter = streamArray.length) => streamArray
  .reduce((mergedStream, stream) => {
    // pipe each stream of the array into the merged stream
    // prevent the automated 'end' event from firing
    mergedStream = stream.pipe(mergedStream, { end: false });
    // rewrite the 'end' event handler
    // Every time one of the stream ends, the counter is decremented.
    // Once the counter reaches 0, the mergedstream can emit its 'end' event.
    stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
    return mergedStream;
  }, new PassThrough());

Can be used like this:

const mergedStreams = concatStreams([stream1, stream2, stream3]);
Contrast answered 4/7, 2019 at 11:46 Comment(2)
this pipes the streams before they are done, jumbling them; this is exactly what the original question was asking about avoiding - how to concatenate, not jumble, the streams.Cestus
to avoid this, you should stream.pipe the next one after the previous one fires the 'end' eventCestus
J
3

You might be able to make it more concise, but here's one that works:

var util = require('util');
var EventEmitter = require('events').EventEmitter;

function ConcatStream(streamStream) {
  EventEmitter.call(this);
  var isStreaming = false,
    streamsEnded = false,
    that = this;

  var streams = [];
  streamStream.on('stream', function(stream){
    stream.pause();
    streams.push(stream);
    ensureState();
  });

  streamStream.on('end', function() {
    streamsEnded = true;
    ensureState();
  });

  var ensureState = function() {
    if(isStreaming) return;
    if(streams.length == 0) {
      if(streamsEnded)
        that.emit('end');
      return;
    }
    isStreaming = true;
    streams[0].on('data', onData);
    streams[0].on('end', onEnd);
    streams[0].resume();
  };

  var onData = function(data) {
    that.emit('data', data);
  };

  var onEnd = function() {
    isStreaming = false;
    streams[0].removeAllListeners('data');
    streams[0].removeAllListeners('end');
    streams.shift();
    ensureState();
  };
}

util.inherits(ConcatStream, EventEmitter);

We keep track of state with streams (the queue of streams;push to the back and shift from the front), isStreaming, and streamsEnded. When we get a new stream, we push it, and when a stream ends, we stop listening and shift it. When the stream of streams ends, we set streamsEnded.

On each of these events, we check the state we're in. If we're already streaming (piping a stream), we do nothing. If the queue is empty and streamsEnded is set, we emit the end event. If there is something in the queue, we resume it and listen to its events.

*Note that pause and resume are advisory, so some streams may not behave correctly, and would require buffering. This exercise is left to the reader.

Having done all of this, I would do the n=2 case by constructing an EventEmitter, creating a ConcatStream with it, and emitting two stream events followed by an end event. I'm sure it could be done more concisely, but we may as well use what we've got.

Jew answered 8/5, 2013 at 5:21 Comment(3)
Thanks Aaron! I was kinda hoping there'd be some existing library so I can solve it in three lines. If there isn't, I'm thinking I might extract your solution into a package. Can I use your code under an MIT license?Sass
Ah, found the stream-stream library. See my answer.Sass
@JoLiss I also looked for something first, but I failed to find that option. You can certainly use my code in a library if you still want to.Jew
M
3

Both of the most upvoted answers here aren't working with asynchronous streams because they just pipe things on regardless whether the source stream is ready to produce. I had to combine in-memory string streams with data feed from a database, and the database content was always at the end of the resulting stream because it takes a second to get a db response. Here's what I ended up writing for my purposes.

export function joinedStream(...streams: Readable[]): Readable {
  function pipeNext(): void {
    const nextStream = streams.shift();
    if (nextStream) {
      nextStream.pipe(out, { end: false });
      nextStream.on('end', function() {
        pipeNext();
      });
    } else {
      out.end();
    }
  }
  const out = new PassThrough();
  pipeNext();
  return out;
}
Moderation answered 21/4, 2020 at 18:38 Comment(1)
Thank you, best solution here - it short, clear and correctLeung
S
2

https://github.com/joepie91/node-combined-stream2 is a drop-in Streams2-compatible replacement for the combined-stream module (which is described above.) It automatically wraps Streams1 streams.

Example code for combined-stream2:

var CombinedStream = require('combined-stream2');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));
Shorttempered answered 7/5, 2015 at 3:56 Comment(0)
H
1

streamee.js is a set of stream transformers and composers based on node 1.0+ streams and include a concatenate method:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);
Hafler answered 29/5, 2013 at 10:12 Comment(2)
Thanks, I'll check it out. That's Node 0.10 I assume?Sass
Yes Node 0.10, but you can wrap old-style streams into 0.10+ streams as written in the READMEHafler
I
0

The below code worked for me :). Have taken the inputs from all the answers given earlier

  const pipeStreams = (streams) => {
  const out = new PassThrough()
  // Piping the first stream to the out stream
  // Also prevent the automated 'end' event of out stream from firing
  streams[0].pipe(out, { end: false })
  for (let i = 0; i < streams.length - 2; i++) {
    // On the end of each stream (until the second last) pipe the next stream to the out stream
    // Prevent the automated 'end' event of out stream from firing
    streams[i].on('end', () => {
      streams[i + 1].pipe(out, { end: false })
    })
  }
  // On the end of second last stream pipe the last stream to the out stream.
  // Don't prevent the 'end flag from firing'
  streams[streams.length - 2].on('end', () => {
    streams[streams.length - 1].pipe(out)
  })
  return out
} 
Impermissible answered 13/4, 2020 at 0:43 Comment(0)
W
0

Nisha provided my favourite solution to this problem. Some of the solutions didn't remove the end event which caused some issues when doing audio stream merging. However, he forgot to handle the obvious case of when there is just one stream. Thank you so much for the well-thought solution Nisha!

const pipeStreams = (streams: Stream[]): Stream => {
    //If there is only one stream, return that stream
    if (streams.length == 1) return streams[0];
    const out = new PassThrough()
    // Piping the first stream to the out stream
    // Also prevent the automated 'end' event of out stream from firing
    streams[0].pipe(out, { end: false })
    for (let i = 0; i < streams.length - 2; i++) {
        // On the end of each stream (until the second last) pipe the next stream to the out stream
        // Prevent the automated 'end' event of out stream from firing
        streams[i].on('end', () => {
            streams[i + 1].pipe(out, { end: false })
        })
    }
    // On the end of second last stream pipe the last stream to the out stream.
    // Don't prevent the 'end flag from firing'
    streams[streams.length - 2].on('end', () => {
        streams[streams.length - 1].pipe(out)
    })
    return out
}
Wivestad answered 4/7, 2022 at 19:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.