How to call an asynchronous function inside a node.js readable stream
Asked Answered
D

2

11

This is a short example of the implementation of a custom readable stream. The class is called MyStream. The stream gets the file/foldernames out of a directory and pushes the values to the data-event.

To compare I implemented (in this example) two different ways/functions. One is syncronous and the other is asynchronous. The second argument of the constructor lets you decide, which way is used (true for the asynchronous and false for synchronous.

The readcounter counts the number of times the method _read is called. Just to give a feedback.

var Readable = require('stream').Readable;
var util = require('util');
var fs = require('fs');
util.inherits(MyStream, Readable);

function MyStream(dirpath, async, opt) {
  Readable.call(this, opt);
  this.async = async;
  this.dirpath = dirpath;
  this.counter = 0;
  this.readcounter = 0;
}

MyStream.prototype._read = function() {
  this.readcounter++;
  if (this.async === true){
    console.log("Readcounter: " + this.readcounter);
    that = this;
    fs.readdir(this.dirpath,function(err, files){
      that.counter ++;
      console.log("Counter: " + that.counter);
      for (var i = 0; i < files.length; i++){
        that.push(files[i]);
      }
      that.push(null);
    });
  } else {
    console.log("Readcounter: " + this.readcounter);
    files = fs.readdirSync(this.dirpath)
    for (var i = 0; i < files.length; i++){
      this.push(files[i]);
    };
    this.push(null);
  }
};
//Instance for a asynchronous call
mystream = new MyStream('C:\\Users', true);
mystream.on('data', function(chunk){
  console.log(chunk.toString());
});

The synchronous way works like expected, but something interesting is happening, when I call it asynchronously. Everytime the filename is pushed via that.push(files[i]) the _read method is called again. Which causes errors, when the first asynchronous loop is finished and that.push(null) defines the end of the stream.

The enviroment I am using to test this: node 4.1.1, Electron 0.35.2.

I do not understand why _read is called so ofthen and why this is happening. Maybe it is a bug? Or is there somthing I do not see at the moment. Is there a way to build a readable stream by using asynchronous functions? To push the chunks asynchronously would be really cool, because it would be the non blocking stream way. Specially when you have bigger amount of data.

Digitigrade answered 3/12, 2015 at 13:24 Comment(0)
C
4

_read is called whenever the "reader" needs data and it usually happens just after you push data.

I had the same sort of "issues" with implementing _read directly so now, I write a function returning a stream object. It works quite good and data can't be "pulled" from my stream, data is avalaible/pushed when I decide it. With your example, I would do it like this:

var Readable = require('stream').Readable;
var fs = require('fs');

function MyStream(dirpath, async, opt) {
  var rs = new Readable();
  // needed to avoid "Not implemented" exception
  rs._read = function() { 
    // console.log('give me data!'); // << this will print after every console.log(folder);
  };

  var counter = 0;
  var readcounter = 0;

  if (async) {
    console.log("Readcounter: " + readcounter);
    fs.readdir(dirpath, function (err, files) {
      counter++;
      console.log("Counter: " + counter);
      for (var i = 0; i < files.length; i++) {
        rs.push(files[i]);
      }
      rs.push(null);
    });
  } else {
    console.log("Readcounter: " + readcounter);
    files = fs.readdirSync(dirpath)
    for (var i = 0; i < files.length; i++) {
      rs.push(files[i]);
    };
    rs.push(null);
  }

  return rs;
}

var mystream = MyStream('C:\\Users', true);
mystream.on('data', function (chunk) {
  console.log(chunk.toString());
});

It doesn't directly answer your question but it's a way to get a working code.

Circumnavigate answered 3/12, 2015 at 14:23 Comment(5)
In this case the _read method is not called everytime data is pushed. If you use the same class just with the synchronous readdir the counter stays 1. Your wokring code uses just the readable stream class, but don't create a new stream. But I will try to work with the async function not inside the _read method.Digitigrade
rs is a stream, the difference is you're extending Readable. When you do that, you must call push() only once in any path of execution in _read. Your code can work if you store files at the object level (this.files) and keep an index to know wich element you should push next. I can post a "fixed" version of your code if you want.Circumnavigate
No you don't need to fix the code. It is just an example. The suggestion with the index works with my example, but what if the function takes some time, returns 10.000 values and needs some cpu ressources. Than it is not optimal to call _read 10.000 times and filter just one value out of that.Digitigrade
Well, that how streams work, it's a flow of data, _read is supposed to be called by the consumer when it's ready to consume more data. Depending on your data source, streams can be very performant. A quick example, using fs.createReadStream() to read a 16 millions lines csv (2GB size, most text editors can't even open this file) and a package to get a specific line, the 10000th line is retrieved in less than 30ms.Circumnavigate
This does not follow the requirement to stop pushing when push returns false. According to the docs: "_read() may continue reading from the resource and pushing data until readable.push() returns false. Only when _read() is called again after it has stopped should it resume pushing additional data into the queue."Nowise
E
0

Fixed since Node 10

https://github.com/nodejs/node/issues/3203

If my understanding is correct, before Node 10, async _read() implementation had to call this.push() only once with data and create their own buffer in order to delay following this.push() to the next _read() call.

const {Readable} = require('stream');
let i = 0;
const content_length = 5;
let content_read = 0;

const stream = new Readable({
  encoding: 'utf8',
  read() {
    console.log('read', ++i);
    const icopy = i;
    setTimeout(() => {
      for (let a=1; a<=3; a++) {
        this.push(icopy+':'+a);
      }
      content_read++;
      if (content_read == content_length) {
        console.log('close');
        this.push(null);
      }
    }, Math.floor(Math.random()*1000));
  },
});

stream.on('data', (data) => {
  console.log(data);
});

Node 8.17.0 :

read 1
1:1
read 2
1:2
read 3
1:3
read 4
2:1
read 5
2:2
read 6
2:3
read 7
6:1
read 8
6:2
read 9
6:3
read 10
9:1
read 11
9:2
read 12
9:3
read 13
12:1
read 14
12:2
read 15
12:3
read 16
close
events.js:183
      throw er; // Unhandled 'error' event
      ^

Error: stream.push() after EOF

Node 10.24.1:

read 1
1:1
1:2
1:3
read 2
2:1
2:2
2:3
read 3
3:1
3:2
3:3
read 4
4:1
4:2
4:3
read 5
5:1
5:2
5:3
close
Exoenzyme answered 2/7, 2021 at 15:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.