How to close a readable stream (before end)?
Asked Answered
B

10

134

How to close a readable stream in Node.js?

var input = fs.createReadStream('lines.txt');

input.on('data', function(data) {
   // after closing the stream, this will not
   // be called again

   if (gotFirstLine) {
      // close this stream and continue the
      // instructions from this if
      console.log("Closed.");
   }
});

This would be better than:

input.on('data', function(data) {
   if (isEnded) { return; }

   if (gotFirstLine) {
      isEnded = true;
      console.log("Closed.");
   }
});

But this would not stop the reading process...

Breaux answered 9/10, 2013 at 16:4 Comment(3)
Warning: This question is only in the context of the fs module. close does not exist in Stream.Readable.Galenical
Good news. Node version 8 provides stream.destroy()Norris
can't you call readable.push(null) && readable.destroy();Filter
M
46

Invoke input.close(). It's not in the docs, but

https://github.com/joyent/node/blob/cfcb1de130867197cbc9c6012b7e84e08e53d032/lib/fs.js#L1597-L1620

clearly does the job :) It actually does something similar to your isEnded.

EDIT 2015-Apr-19 Based on comments below, and to clarify and update:

  • This suggestion is a hack, and is not documented.
  • Though for looking at the current lib/fs.js it still works >1.5yrs later.
  • I agree with the comment below about calling destroy() being preferable.
  • As correctly stated below this works for fs ReadStreams's, not on a generic Readable

As for a generic solution: it doesn't appear as if there is one, at least from my understanding of the documentation and from a quick look at _stream_readable.js.

My proposal would be put your readable stream in paused mode, at least preventing further processing in your upstream data source. Don't forget to unpipe() and remove all data event listeners so that pause() actually pauses, as mentioned in the docs

Mauretta answered 9/10, 2013 at 16:17 Comment(5)
Actually I would prefer calling destroy instead. At least that's what is called if you set autoClose to true. By looking at the source code (today) the differences are minimal (destroy calls close) but that could change in the futureEbb
Don't remember by now, but looks like it :)Mauretta
There is no close() on object Readable, is there a never solution? My data exchange is always incomplete...Riley
Updated to clarify, address comments, and provide a (poor man's) suggestion for the generic case. Though it does make some sense to not force a generic readable to implement close(), and provide a class-specific way of doing this (as is the case in fs, and presumably other classes implementing Readable)Mauretta
Won't pausing cause the upstream (sender) to block due to backpressure, or otherwise cause buffers to grow until they exceed their limits? Ideally we would tell the sender that it is no longer wanted...Norris
R
121

Edit: Good news! Starting with Node.js 8.0.0 readable.destroy is officially available: https://nodejs.org/api/stream.html#stream_readable_destroy_error

ReadStream.destroy

You can call the ReadStream.destroy function at any time.

var fs = require("fs");

var readStream = fs.createReadStream("lines.txt");
readStream
    .on("data", function (chunk) {
        console.log(chunk);
        readStream.destroy();
    })
    .on("end", function () {
        // This may not been called since we are destroying the stream
        // the first time "data" event is received
        console.log("All the data in the file has been read");
    })
    .on("close", function (err) {
        console.log("Stream has been destroyed and file has been closed");
    });

The public function ReadStream.destroy is not documented (Node.js v0.12.2) but you can have a look at the source code on GitHub (Oct 5, 2012 commit).

The destroy function internally mark the ReadStream instance as destroyed and calls the close function to release the file.

You can listen to the close event to know exactly when the file is closed. The end event will not fire unless the data is completely consumed.


Note that the destroy (and the close) functions are specific to fs.ReadStream. There are not part of the generic stream.readable "interface".

Regolith answered 22/4, 2015 at 18:28 Comment(5)
At least in the latest version of Node (haven't checked the others), the file descriptor is closed automatically. That said, I haven't done any kind of thorough test to ensure that the stream eventually fires error if it is never read. Aside from that, the only other leak I'd worry about is event handlers-- once again, I'm not 100% sure on this, but we might be ok b/c the 2010 the gospel of Isaacs does say that handlers are pruned when emitters are gc'd: groups.google.com/d/msg/nodejs/pXbJVo0NtaY/BxUmF_jp9LkJLimoges
If data is too small, the on('data') will only trigger once, so there will not any .close(), just remind someone else.Truitt
you can actually use this.destroy() unless you're using an arrow function. Lexical this I hate you :DPrincipal
I'm using pipe() and I had to move on("close") to be before .pipe() an on("data"), otherwise I wasn't able to catch "close" eventPeipus
@MaximMazurok you are a life saver brother, this had me spinning for hours, Thanks!Ichnology
M
46

Invoke input.close(). It's not in the docs, but

https://github.com/joyent/node/blob/cfcb1de130867197cbc9c6012b7e84e08e53d032/lib/fs.js#L1597-L1620

clearly does the job :) It actually does something similar to your isEnded.

EDIT 2015-Apr-19 Based on comments below, and to clarify and update:

  • This suggestion is a hack, and is not documented.
  • Though for looking at the current lib/fs.js it still works >1.5yrs later.
  • I agree with the comment below about calling destroy() being preferable.
  • As correctly stated below this works for fs ReadStreams's, not on a generic Readable

As for a generic solution: it doesn't appear as if there is one, at least from my understanding of the documentation and from a quick look at _stream_readable.js.

My proposal would be put your readable stream in paused mode, at least preventing further processing in your upstream data source. Don't forget to unpipe() and remove all data event listeners so that pause() actually pauses, as mentioned in the docs

Mauretta answered 9/10, 2013 at 16:17 Comment(5)
Actually I would prefer calling destroy instead. At least that's what is called if you set autoClose to true. By looking at the source code (today) the differences are minimal (destroy calls close) but that could change in the futureEbb
Don't remember by now, but looks like it :)Mauretta
There is no close() on object Readable, is there a never solution? My data exchange is always incomplete...Riley
Updated to clarify, address comments, and provide a (poor man's) suggestion for the generic case. Though it does make some sense to not force a generic readable to implement close(), and provide a class-specific way of doing this (as is the case in fs, and presumably other classes implementing Readable)Mauretta
Won't pausing cause the upstream (sender) to block due to backpressure, or otherwise cause buffers to grow until they exceed their limits? Ideally we would tell the sender that it is no longer wanted...Norris
T
28

Today, in Node 10

readableStream.destroy()

is the official way to close a readable stream

see https://nodejs.org/api/stream.html#stream_readable_destroy_error

Timbrel answered 3/6, 2018 at 12:40 Comment(0)
P
14

You can't. There is no documented way to close/shutdown/abort/destroy a generic Readable stream as of Node 5.3.0. This is a limitation of the Node stream architecture.

As other answers here have explained, there are undocumented hacks for specific implementations of Readable provided by Node, such as fs.ReadStream. These are not generic solutions for any Readable though.

If someone can prove me wrong here, please do. I would like to be able to do what I'm saying is impossible, and would be delighted to be corrected.

EDIT: Here was my workaround: implement .destroy() for my pipeline though a complex series of unpipe() calls. And after all that complexity, it doesn't work properly in all cases.

EDIT: Node v8.0.0 added a destroy() api for Readable streams.

Perspicuous answered 31/12, 2015 at 14:59 Comment(1)
There’s now stream.pipeline, which claims to handle “forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.” Does that help?Virile
A
14

At version 4.*.* pushing a null value into the stream will trigger a EOF signal.

From the nodejs docs

If a value other than null is passed, The push() method adds a chunk of data into the queue for subsequent stream processors to consume. If null is passed, it signals the end of the stream (EOF), after which no more data can be written.

This worked for me after trying numerous other options on this page.

Alba answered 21/4, 2016 at 6:19 Comment(1)
Works for me. However, I needed to avoid calling the done() callback after pushing null to get the expected behavior - namely that the entire stream halts.Shuttering
F
7

This destroy module is meant to ensure a stream gets destroyed, handling different APIs and Node.js bugs. Right now is one of the best choice.

NB. From Node 10 you can use the .destroy method without further dependencies.

Fluellen answered 10/6, 2017 at 18:44 Comment(0)
F
3

You can clear and close the stream with yourstream.resume(), which will dump everything on the stream and eventually close it.

From the official docs:

readable.resume():

Return: this

This method will cause the readable stream to resume emitting 'data' events.

This method will switch the stream into flowing mode. If you do not want to consume the data from a stream, but you do want to get to its 'end' event, you can call stream.resume() to open the flow of data.

var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', () => {
  console.log('got to the end, but did not read anything');
});
Froissart answered 4/4, 2016 at 13:33 Comment(2)
This can be called "draining" the stream. In our case, of course we had a 'data' event listener, but we made it check a boolean if (!ignoring) { ... } so it won't process data when we are draining the stream. ignoring = true; readable.resume();Norris
Of course this assumes the stream will 'end' at some point. Not all streams will do that! (E.g. a stream that sends the date every second, forever.)Norris
B
3

It's an old question but I too was looking for the answer and found the best one for my implementation. Both end and close events get emitted so I think this is the cleanest solution.

This will do the trick in node 4.4.* (stable version at the time of writing):

var input = fs.createReadStream('lines.txt');

input.on('data', function(data) {
   if (gotFirstLine) {
      this.end(); // Simple isn't it?
      console.log("Closed.");
   }
});

For a very detailed explanation see: http://www.bennadel.com/blog/2692-you-have-to-explicitly-end-streams-after-pipes-break-in-node-js.htm

Butanone answered 7/6, 2016 at 14:39 Comment(0)
G
2

This code here will do the trick nicely:

function closeReadStream(stream) {
    if (!stream) return;
    if (stream.close) stream.close();
    else if (stream.destroy) stream.destroy();
}

writeStream.end() is the go-to way to close a writeStream...

Galleass answered 2/5, 2017 at 5:12 Comment(2)
Why do you mention .end() is the go-to way, but then your code uses close and destroy and doesn't even use end?Wholly
i am closing a readStream in the example... a writeStream -- use .endGalleass
D
0

for stop callback execution after some call, you have to use process.kill with particular processID

const csv = require('csv-parser');
const fs = require('fs');

const filepath = "./demo.csv"
let readStream = fs.createReadStream(filepath, {
    autoClose: true,
});
let MAX_LINE = 0;


readStream.on('error', (e) => {
        console.log(e);
        console.log("error");
    })

    .pipe(csv())
    .on('data', (row) => {

        if (MAX_LINE == 2) {
            process.kill(process.pid, 'SIGTERM')
        }
        // console.log("not 2");
        MAX_LINE++
        console.log(row);
    })

    .on('end', () => {
        // handle end of CSV
        console.log("read done");
    }).on("close", function () {
        console.log("closed");
    })
Duplicature answered 20/12, 2021 at 9:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.