What is a correct way to pause piped readable stream from writeable one in nodejs?
Asked Answered
A

3

18

I am writing a module, which is a writeable stream. I want to implement pipe interface for my users.

If some error happens, i need to pause readable stream and emit error event. Then, user will decide - if he is ok with error, he should be able to resume to data processing.

var writeable = new BackPressureStream();
writeable.on('error', function(error){
    console.log(error);
    writeable.resume();
});

var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);

I see that node provides us with readable.pause() method, that can be used to pause readable stream. But i can't get how i can call it from my writeable stream module:

var Writable = require('stream').Writable;

function BackPressureStream(options) {
    Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    done();
};

BackPressureStream.prototype.resume = function() {
    this.emit('drain');
}

How back pressure can be implemented in a writeable stream?

P.S. It is possible to use pipe/unpipe events, that provide readable stream as a parameter. But it is also said, that for piped streams, the only chance to pause is to unpipe readable stream from writeable.

Did i got it right? I have to unpipe my writeable stream until user calls resume? And after user calls resume, i should pipe readable stream back?

Alterable answered 28/10, 2015 at 8:29 Comment(2)
interested in starting a bounty for this one?Artimas
hey, did you found an answer to your question?Gelb
P
2

What you are describing is already implemented by the pipe method itself. From Errors While Writing section in the docs:

If a Readable stream pipes into a Writable stream when Writable emits an error, the Readable stream will be unpiped.

So, as an implementer of a writable stream, your only job is to implement the _write method and emit an error when it happens. Unpiping will be handled automatically by the Stream module. And then, it is the job of the consumers of your module to pipe readable stream back if they consider an error to be non-critical. Here is how they could do that:

var writeable = new BackPressureStream();
var readable = require('fs').createReadStream('somefile.txt');

writeable.on('error', function(error) {
    // use pipe again, if error is not critical
    if (!error.critical) {
        readable.pipe(writeable);
    } else {
        readable.destroy(error);
    }
});

readable.pipe(writeable);

And inside your module:

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    // call done with an error to emit 'error' event and unpipe readable stream
    done(new Error('BOOM'));
};
Picaresque answered 10/8, 2018 at 12:8 Comment(0)
C
1

There is no need to access or interact with the source stream. The native NodeJS streams now support back pressure and buffering. And pipe() takes care of both.

You only need to implement _write() properly.

function _write(chunk, enc, callback) {
    // if you don't invoke callback, data is buffered, and writes paused when buffer is full
}

Quoting the docs:

All calls to writable.write() that occur between the time writable._write() is called and the callback is called will cause the written data to be buffered.

After forwarding the error, simply do not call callback() for next chunk until user confirms to proceed. This will cause data from source to buffer.

Data is buffered in Writable streams when the writable.write(chunk) method is called repeatedly. While the total size of the internal write buffer is below the threshold set by highWaterMark, calls to writable.write() will return true. Once the size of the internal buffer reaches or exceeds the highWaterMark, false will be returned.

After writable stream's buffer is full, calls to write() will return false. If the source stream implementation is well behaved or a native node stream, it will automatically stop write()ing more data.

Crab answered 11/4, 2018 at 12:55 Comment(0)
F
0

Basically, as I understand it, you are looking to put backpressure on the stream in the case of an error event. You have a couple of options.

Firstly, as you have already identified, use pipe to grab an instance of the read stream and do some fancy footwork.

Another option is to create a wrapping writable stream that provides this functionality (i.e. it takes a WritableStream as an input, and when implementing stream functions, passes the data along to the supplied stream.

Basically you end up with something like

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream deals with implementing a writable stream.

The key for you is that if an error occurs in the underlying writable, you would set a flag on the stream, and the next call to write to happen, you would buffer the chunk, store the callback and only call. Something like

// ...
constructor(wrappedWritableStream) {
    wrappedWritableStream.on('error', this.errorHandler);
    this.wrappedWritableStream = wrappedWritableStream;
}
// ...
write(chunk, encoding, callback) {
    if (this.hadError) {
        // Note: until callback is called, this function won't be called again, so we will have maximum one stored
        //  chunk.
        this.bufferedChunk = [chunk, encoding, callback];
    } else {
        wrappedWritableStream.write(chunk, encoding, callback);
    }
}
// ...
errorHandler(err) {
    console.error(err);
    this.hadError = err;
    this.emit(err);
}
// ...
recoverFromError() {
    if (this.bufferedChunk) {
        wrappedWritableStream.write(...this.bufferedChunk);
        this.bufferedChunk = undefined;
    }
    this.hadError = false;
}

Note: You should only need to implement the write function, but I do encourage you to dig around and play with the other implementation functions.

It is also worth noting that you may have some trouble writing to streams which have emitted an error event, but I will leave that to you as a separate problem to solve.

Here's another good resource on backpressuring https://nodejs.org/en/docs/guides/backpressuring-in-streams/

Forum answered 23/11, 2017 at 8:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.