Node.js Streams Readable to Transform
Asked Answered
J

3

10

I have been trying to use a readable and a transform stream to process a very large file. The problem that I seem to come across is that if I don't put a writable stream at the end, the program seems to terminate before the result gets returned.

Example : rstream.pipe(split()).pipe(tstream)

My tstream has an emitter that emits when a counter hits a threshold. When that threshold is set to a low number, I get a result, but when it's high, it's not returning anything. If I pipe it to a file writer, it always returns a result. Am I missing something obvious?

code:

// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');

var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
    console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {

    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result
    //   rstream.pipe(split()).pipe(qtransformstream);

    // this always works
    rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);

};

Here is the code for the Qtransformstream

// Dependencies
var Transform = require('stream').Transform,
    util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
    // Create this as a Transform Stream
    Transform.call(this, {
        objectMode: true
    });
    // Default the Qbase to 32 as an assumption
    this.Qbase = 32;
    if (Quser) {
        this.Quser = Quser;
    } else {
        this.Quser = 20;
    }
    this.Qpass = this.Quser + this.Qbase;
    this.Counter = 0;
    // Variables used as intermediates
    this.Qmin = 120;
    this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
    var Qmin = this.Qmin;
    var Qmax = this.Qmax;
    var Qbase = this.Qbase;
    var Quser = this.Quser;
    this.Counter++;
    // Stop the stream after 100 reads and emit the data
    if (this.Counter === 100) {
        this.emit('completed', this.Qbase, this.Quser);
    }
    // do some calcs on this.Qbase

    this.push('something not important');
    callback();
};
// export the object
module.exports = TransformStream;
Jonquil answered 4/8, 2015 at 15:34 Comment(4)
Can you post the code for the QTransformStream implementation?Heyman
How many lines you have in input file and what's the maximum counter value in that case. If the counter value is greater than the line numbers, then completed event won't emit. Also you need to push null to end the stream. Not sure what do you have in something not important but there should be a null at some point.Grotius
There are def less lines than the counter, about 7000 lines. It does work when i pipe this to a write stream. Does a transform stream need to have a push(null) to work?Jonquil
you're right, it's not. it might be something else.Grotius
A
11

EDIT:

Also, I don't know how high your counter goes but if you fill up the buffer it will stop passing data to the transform stream in which case completed is never actually hit because you never get to the counter limit. Try changing your highwatermark.

EDIT 2: A Little Better Explanation

As you well know a transform stream is a duplex stream which basically means it can accept data from a source, and it can send data to a destination. This is commonly referred to as reading and writing respectively. The transform stream inherits from both the read stream and the write stream implemented by Node.js. There is one caveat though, the transform stream does not have to implement the _read or _write functions. In that sense you can kind of think of it as the lesser known passthrough stream.

If you think about the fact that the transform stream implements the write stream you must also think about the fact that the write stream always has a destination to dump its contents. The problem you are having is that when you create a transform stream you can't specify a place to send your content. The only way to pass data completely through your transform stream is to pipe it to a write stream, otherwise, in essence your streams get backed up and can't accept more data, because there is no place for the data to go.

This is why when you are piping to a write stream it always works. The write stream is alleviating the data backup by sending the data to a destination, so all of your data will be piped through and the event of complete will be emitted.

The reason that your code is working without the write stream when the sample size is small is that you aren't filling up your stream, so the transform stream can accept enough data to allow the complete event/threshold to be hit. As the threshold increases the amount of data your stream can accept without sending it to another place (a write stream) stays the same. This causes your stream to get backed up, and it can no longer accept data, which means that the completed event will never be emitted.

I would venture to say that if you increase your highwatermark for the transform stream you will be able to increase your threshold and still have the code work. This method is incorrect though. Pipe your stream to a write stream that will send the data to dev/null the way to create that write stream is:

var writer = fs.createWriteStream('/dev/null');

The section in the Node.js docs on buffering explain the error you are running into.

Apophysis answered 28/8, 2015 at 3:41 Comment(2)
The streams in node aren't as simple as they look. I'd love to see a good detailed explanation for these subtleties.Unipolar
I tried to do a better explanation, let me know if there are part of it that aren't clear.Apophysis
E
1

You don't interrupt _transform and process goes far far away. Try:

this.emit('completed', ...);
this.end();

That's why 'program seems to terminate before the result gets returned'

And don't output any useless data:

var wstream = fs.createWriteStream('/dev/null');

Good luck)

Ecphonesis answered 31/8, 2015 at 13:51 Comment(0)
L
1

I would suggest to use a Writable rather than a Transform stream. Then rename _transform to _write and your code will consume the stream if you pipe to it. A transform stream, as @Bradgnar already pointed out, needs a consumer or it will stop the readable stream from pushing more data to its buffer.

Luxe answered 3/9, 2015 at 17:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.