How do I implement a basic node Stream.Readable example?
Asked Answered
G

2

12

I'm trying to learn streams and am having a little bit of an issue getting it to work right.

For this example, I'd simply like to push a static object to the stream and pipe that to my servers response.

Here's what I have so far, but a lot of it doesn't work. If I could even just get the stream to output to console, I can figure out how to pipe it to my response.

var Readable = require('stream').Readable;

var MyStream = function(options) {
  Readable.call(this);
};

MyStream.prototype._read = function(n) {
  this.push(chunk);
};

var stream = new MyStream({objectMode: true});
s.push({test: true});

request.reply(s);
Girvin answered 20/12, 2013 at 17:37 Comment(0)
V
10

There are a couple of issues with your current code.

  1. The request stream is most likely a buffer mode stream: this means that you can't write objects into it. Fortunately, you don't pass through the options to the Readable constructor so your mistake doesn't cause any trouble, but semantically this is wrong and will not produce the expected results.
  2. You call the constructor of Readable, but don't inherit the prototype properties. You should use util.inherits() to subclass Readable.
  3. The chunk variable isn't defined anywhere in your code sample.

Here is a working example:

var util = require('util');
var Readable = require('stream').Readable;

var MyStream = function(options) {
  Readable.call(this, options); // pass through the options to the Readable constructor
  this.counter = 1000;
};

util.inherits(MyStream, Readable); // inherit the prototype methods

MyStream.prototype._read = function(n) {
  this.push('foobar');
  if (this.counter-- === 0) { // stop the stream
    this.push(null);
  }
};

var mystream = new MyStream();
mystream.pipe(process.stdout);
Viborg answered 20/12, 2013 at 17:52 Comment(6)
I'm confused about how to push stuff to the stream. I see you're pushing foobar, but I need to push stuff to the mystream instance. How do I do that?Girvin
Did you run the code? The this.push() function call means push the data to reading queue. This means that everything you push will be available for the consumers of the stream (here, this is the process.stdout stream).Viborg
You should also read the Stream Handbook, especially the creating a readable stream section.Viborg
Because the object's prototype is stream.Readable, yesViborg
How does this readable stream get any data? It's unclear. It would be better to implement the _read method to read data from a file.Better
Readable streams are the most non-intuitive thing in the node.js world.Better
D
0

The following is a variation of Paul Mougel's answer using ES6. It implements a countdown stream from a positive counter down to 0 (the counter defaults to 1000). Then we create a stream counter of 100, which is piped into the writable stream process.stdout:

const { Readable } = require('stream');

class CountdownStream extends Readable {
  constructor(counter, options) {
    super(options);
    this._counter = (!isNaN(counter) && counter > 0) ? counter : 1000;
  }

  _read(size) {
    if (this._counter == 0) {
      this.push(this._counter.toString());
      this.push(null);
    }
    else {
      this.push(this._counter + "\n");
    }
    this._counter -= 1;
  }
}

const counter = new CountdownStream(100);
counter.pipe(process.stdout);
Discomposure answered 11/6, 2023 at 20:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.