How to emit/pipe array values as a readable stream in node.js?
Asked Answered
P

5

27

What is the best way to create a readable stream from an array and pipe values to a writable stream? I have seen substack's example using setInterval and I can implement that successfully using 0 for the interval value, but I am iterating over a lot of data and triggering gc every time is slowing things down.

// Working with the setInterval wrapper
var arr = [1, 5, 3, 6, 8, 9];

function createStream () {
    var t = new stream;
    t.readable = true;
    var times = 0;
    var iv = setInterval(function () {
        t.emit('data', arr[times]);
        if (++times === arr.length) {
            t.emit('end');
            clearInterval(iv);
        }
    }
}, 0);

// Create the writable stream s
// ....

createStream().pipe(s);

What I would like to do is emit values without the setInterval. Perhaps using the async module like this:

async.forEachSeries(arr, function(item, cb) {
    t.emit('data', item);
    cb();
}, function(err) {
 if (err) {
     console.log(err);
 }
 t.emit('end');
});

In this case I iterate the array and emit data, but never pipe any values. I have already seen shinout's ArrayStream, but I think that was created before v0.10 and it is a bit more overhead than I am looking for.

Politicking answered 31/5, 2013 at 2:41 Comment(3)
I don't think you will be able to get much less overhead than ArrayStream (110 sloc). async is going to be similar to substack's example in it's use of setImmediate. I don't think you need setImmediate/setInterval for every data event since you're not doing IO but you will need to handle pause/resume which ArrayStream does for you. Curious to see what answers you get.Oralee
Thanks for the input. I guess my biggest concern with ArrayStream was that it had not been updated since the changes in the Stream API in v0.10, but those worries could be unfounded. I was surprised that it had so few downloads which makes me believe that others are doing this differently.Politicking
Please consider unaccepting the accepted answer and accepting the one recommendign the standard Readable.fromColburn
U
47

You can solve this problem by creating a readable stream and pushing values into it.

Streams are a pain, but it's often easier to work with them directly than to use libraries.

Array of strings or buffers to stream

If you're working with an array of strings or buffers, this will work:

'use strict'
const Stream = require('stream')
const readable = new Stream.Readable()

readable.pipe(process.stdout)

const items = ['a', 'b', 'c']
items.forEach(item => readable.push(item))

// no more data
readable.push(null)

Notes:

  • readable.pipe(process.stdout) does two things: puts the stream into "flowing" mode and sets up the process.stdout writable stream to receive data from readable
  • the Readable#push method is for the creator of the readable stream, not the stream consumer.
  • You have to do Readable#push(null) to signal that there is no more data.

Array of non-strings to stream

To make a stream from an array of things that are neither strings nor buffers, you need both the readable stream and the writable stream to be in "Object Mode". In the example below, I made the following changes:

  • Initialize the readable stream with {objectMode: true}
  • Instead of piping to process.stdout, pipe to a simple writable stream that is in object mode.

      'use strict'
      const Stream = require('stream')
    
      const readable = new Stream.Readable({objectMode: true})
    
      const writable = new Stream.Writable({objectMode: true})
      writable._write = (object, encoding, done) => {
        console.log(object)
    
        // ready to process the next chunk
        done()
      }
    
      readable.pipe(writable)
    
      const items = [1, 2, 3]
      items.forEach(item => readable.push(item))
    
      // end the stream
      readable.push(null)
    

Performance Note

Where is the data coming from? If it's a streaming data source, it's better to manipulate the stream using a transform stream than to convert to/from an array.

Urethritis answered 27/2, 2016 at 17:3 Comment(9)
How can we chain the writable from your last exemple ?Hatchery
@Gura, what are you trying to do? readable.pipe(writable).pipe(somethingElse)? If so, writable will have to be a Transform stream. I can add info about that if it's what you're looking for.Urethritis
I used Transform, it works very well ! thanks @mheiberHatchery
Fantastic answer, and especially the first resource you linked to. I found it to be incredibly thorough and useful.Used
@DaveVoyles-MSFT re the first resource: it's the go-to that everyone recommends, but I found the Node docs themselves invaluable for figuring out everything I needed. In particular, I tend to need transform streams and the handbook doesn't cover those.Urethritis
Might be missing to implement _read() of the Readable stream. See here for a similar answer https://mcmap.net/q/108319/-how-to-create-streams-from-string-in-node-js (strings to streams).Madame
I'm pretty sure the readable.push() call needs to be checking if the streams internal buffer is full, no? Otherwise, there's a risk that some elements in the array don't make it into the stream.Donnenfeld
Is it important to look at the return value of push(), and to stop push()ing if there is backpressure?Meir
@Meir that sounds important. Would you be up for editing the answer if you find a good way to do this?Urethritis
S
28

As of Node 12.3, you can use stream.Readable.from(iterable, [options]) instead.

const { Readable } = require("stream");

const arr = [1, 5, 3, 6, 8, 9];
const readableStream = Readable.from(arr);

readableStream.on("data", (row) => console.log(row));

It also works with Objects

const arr = [
  { index: 1, hello: { foo: "bar" } },
  { index: 2, hello: { foo: "bar" } },
  { index: 3, hello: { foo: "bar" } },
];
const readableStream = Readable.from(arr);
Scarabaeid answered 28/2, 2020 at 21:32 Comment(2)
It should work for an Iterable -- do you have an example of it not working for arrays of objects?Scarabaeid
Oh, you're right. I had tried it in something I was working on and it didn't work. I assumed it was because the Readable wasn't in objectMode. But I just created a simple example and it worked. I'll delete my comment.Hatchway
K
15

tl;dr;

This is a LIFO solution. Array.prototype.pop() has similar behavior to shift but applied to the last element in an array.

const items = [1,2,3]
const stream = new Readable({
  objectMode: true,
  read() {
    const item = items.pop()
    if (!item) {
      this.push(null);
      return;
    }
    this.push(item)
  },
})
Kos answered 19/6, 2018 at 19:6 Comment(3)
Should we not shift(), to send the array in order, instead of pop().Gosnell
This is a LIFO solution. Array.prototype.pop() has similar behavior to shift but applied to the last element in an array.Kos
Would you edit the answer to clarify that? Because all the other answers, and indeed my own expectation coming into this question, are around FIFO.Gosnell
P
2

I wound up using ArrayStream for this. It did resolve the issue with the GC being triggered too often. I was getting warnings for a recursive process.nextTick from node so modified the nextTick callbacks in ArrayStream to setImmediate and that fixed the warnings and seems to be working well.

Politicking answered 28/6, 2013 at 14:40 Comment(0)
F
2

It's an old question, but if anyone stumbles on this, node-stream-array is a much simpler and more elegant implementation for Node.js >= v0.10

var streamify = require('stream-array'),
  os = require('os');

streamify(['1', '2', '3', os.EOL]).pipe(process.stdout);
Fibrovascular answered 16/9, 2015 at 9:27 Comment(2)
Unfortunately, node-stream-array does not return a true Readable, and that may cause problem with other parts of the code (ex: missing destroy function when used with promisepipe)Kevinkevina
Just ran into the "not a Readable" issue that Ludovic commented about. Attempting "for await (const chunk of stream)" results in the following error: "TypeError: stream is not async iterable"Pointer

© 2022 - 2024 — McMap. All rights reserved.