How i can resume after error event in piped stream in nodejs?
Asked Answered
W

2

6

After i emit error event in MyWritableStream, data transmission stops. What i need to do to resume data transfer?

var readable = fs.createReadStream('test.txt');
var writable = new MyWritableStream();

writable.on('error', function(error) {
    console.log('error', error);
    // How i can resume?
});

writable.on('finish', function(){
    console.log('finished');
})

readable.pipe(writable);
Wistful answered 28/10, 2015 at 9:16 Comment(0)
K
2

I know this question is old, but you might wanna check out https://github.com/miraclx/xresilient

I built this for this exact same reason (works best with seekable streams).

You define a function that returns a readable stream, the library measures the number of bytes that have passed through until an error is met.

Once the readable stream encounters an error event, it recalls the defined function with the number of bytes read so you can index the stream source.

Example:

const fs = require('fs');
const xresilient = require('xresilient');

const readable = xresilient(({bytesRead}) => {
  return generateSeekableStreamSomehow({start: bytesRead});
}, {retries: 5});

const writable = fs.createWriteStream('file.test');

readable.pipe(writable);
  • File streams are indexable with the start option of the fs.createReadStream() function.
  • HTTP Requests are indexable with the Range HTTP Header.

Check it out. https://www.npmjs.com/package/xresilient

Kelvinkelwen answered 9/1, 2020 at 5:35 Comment(0)
W
1

I am not sure, if it is a normal practice, but i can't see another solution for now & it works for me. If you can advise more accurate solution, please do it.

We can track readable stream instance using pipe event in writeable one:

function WriteableStream(options) {
    Writable.call(this, options);

    this.source = null;

    var instance = this;

    this.on('pipe', function(source){
        instance.source = source;
    });
}
util.inherits(WriteableStream, Writable);

So, when we emit error event, and readable stream is unpiped automatically, we can re-pipe it ourself:

WriteableStream.prototype._write = function(chunk, encoding, done) {
    this.emit('error', new Error('test')); // unpipes readable
    done();
};

WriteableStream.prototype.resume = function() {
    this.source.pipe(this); // re-pipes readable
}

Finally, we will use it the following way:

var readable = fs.createReadStream(file);
var writeable = new WriteableStream();

writeable.on('error', function(error) {
    console.log('error', error);
    writeable.resume();
});

readable.pipe(writeable);
Wistful answered 28/10, 2015 at 10:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.