_read() is not implemented on Readable stream
Asked Answered
A

4

27

This question is how to really implement the read method of a readable stream.

I have this implementation of a Readable stream:

import {Readable} from "stream";
this.readableStream = new Readable();

I am getting this error

events.js:136 throw er; // Unhandled 'error' event ^

Error [ERR_STREAM_READ_NOT_IMPLEMENTED]: _read() is not implemented at Readable._read (_stream_readable.js:554:22) at Readable.read (_stream_readable.js:445:10) at resume_ (_stream_readable.js:825:12) at _combinedTickCallback (internal/process/next_tick.js:138:11) at process._tickCallback (internal/process/next_tick.js:180:9) at Function.Module.runMain (module.js:684:11) at startup (bootstrap_node.js:191:16) at bootstrap_node.js:613:3

The reason the error occurs is obvious, we need to do this:

  this.readableStream = new Readable({
      read(size) {
        return true;
      }
    });

I don't really understand how to implement the read method though.

The only thing that works is just calling

this.readableStream.push('some string or buffer');

if I try to do something like this:

   this.readableStream = new Readable({
          read(size) {
            this.push('foo');   // call push here!
            return true;
          }
     });

then nothing happens - nothing comes out of the readable!

Furthermore, these articles says you don't need to implement the read method:

https://github.com/substack/stream-handbook#creating-a-readable-stream

https://medium.freecodecamp.org/node-js-streams-everything-you-need-to-know-c9141306be93

My question is - why does calling push inside the read method do nothing? The only thing that works for me is just calling readable.push() elsewhere.

Alp answered 16/3, 2018 at 9:53 Comment(0)
S
22

why does calling push inside the read method do nothing? The only thing that works for me is just calling readable.push() elsewhere.

I think it's because you are not consuming it, you need to pipe it to an writable stream (e.g. stdout) or just consume it through a data event:

const { Readable } = require("stream");

let count = 0;
const readableStream = new Readable({
    read(size) {
        this.push('foo');
        if (count === 5) this.push(null);
        count++;
    }
});

// piping
readableStream.pipe(process.stdout)

// through the data event
readableStream.on('data', (chunk) => {
  console.log(chunk.toString());
});

Both of them should print 5 times foo (they are slightly different though). Which one you should use depends on what you are trying to accomplish.

Furthermore, these articles says you don't need to implement the read method:

You might not need it, this should work:

const { Readable } = require("stream");

const readableStream = new Readable();

for (let i = 0; i <= 5; i++) {
    readableStream.push('foo');
}
readableStream.push(null);

readableStream.pipe(process.stdout)

In this case you can't capture it through the data event. Also, this way is not very useful and not efficient I'd say, we are just pushing all the data in the stream at once (if it's large everything is going to be in memory), and then consuming it.

Selfacting answered 16/3, 2018 at 17:21 Comment(4)
I have always been consuming it, with readable.on('data', function(){});Alp
I expanded the explanation, if you want to do it like that you need to implement the read method indeed!Selfacting
try your second example - it should throw an error saying that you need to implement _read()...Alp
It's working for me, are you closing it pushing null?Selfacting
L
5

From documentation:

readable._read:

"When readable._read() is called, if data is available from the resource, the implementation should begin pushing that data into the read queue using the this.push(dataChunk) method. link"

readable.push:

"The readable.push() method is intended be called only by Readable implementers, and only from within the readable._read() method. link"

Ladle answered 10/3, 2019 at 8:50 Comment(0)
H
0

Implement the _read method after your ReadableStream's initialization:

import {Readable} from "stream";
this.readableStream = new Readable();
this.readableStream.read = function () {};
Hussite answered 5/3, 2019 at 16:0 Comment(0)
Z
0

readableStream is like a pool:

  • .push(data), It's like pumping water to a pool.
  • .pipe(destination), It's like connecting the pool to a pipe and pump water to other place
  • The _read(size) run as a pumper and control how much water flow and when the data is end.

The fs.createReadStream() will create read stream with the _read() function has been auto implemented to push file data and end when end of file.

The _read(size) is auto fire when the pool is attached to a pipe. Thus, if you force calling this function without connect a way to destination, it will pump to ?where? and it affect the machine status inside _read() (may be the cursor move to wrong place,...)

The read() function must be create inside new Stream.Readable(). It's actually a function inside an object. It's not readableStream.read(), and implement readableStream.read=function(size){...} will not work.

The easy way to understand implement:

var Reader=new Object();
Reader.read=function(size){
    if (this.i==null){this.i=1;}else{this.i++;}
    this.push("abc");
    if (this.i>7){ this.push(null); }
}

const Stream = require('stream');
const renderStream = new Stream.Readable(Reader);

renderStream.pipe(process.stdout)

You can use it to reder what ever stream data to POST to other server. POST stream data with Axios :

require('axios')({
    method: 'POST',
    url: 'http://127.0.0.1:3000',
    headers: {'Content-Length': 1000000000000},
    data: renderStream
});
Zamindar answered 9/10, 2021 at 15:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.