How to use drain event of stream.Writable in Node.js
Asked Answered
S

4

17

In Node.js I'm using the fs.createWriteStream method to append data to a local file. In the Node documentation they mention the drain event when using fs.createWriteStream, but I don't understand it.

var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);

In the code above, how can I use the drain event? Is the event used properly below?

var data = 'this is my data';
if (!streamExists) {
  var stream = fs.createWriteStream('fileName.txt');
}

var result = stream.write(data);
if (!result) {
  stream.once('drain', function() {
    stream.write(data);
  });
}
Sponsor answered 21/9, 2013 at 12:17 Comment(0)
W
38

The drain event is for when a writable stream's internal buffer has been emptied.

This can only happen when the size of the internal buffer once exceeded its highWaterMark property, which is the maximum bytes of data that can be stored inside a writable stream's internal buffer until it stops reading from the data source.

The cause of something like this can be due to setups that involve reading a data source from one stream faster than it can be written to another resource. For example, take two streams:

var fs = require('fs');

var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');

Now imagine that the file read is on a SSD and can read at 500MB/s and write is on a HDD that can only write at 150MB/s. The write stream will not be able to keep up, and will start storing data in the internal buffer. Once the buffer has reached the highWaterMark, which is by default 16KB, the writes will start returning false, and the stream will internally queue a drain. Once the internal buffer's length is 0, then the drain event is fired.

This is how a drain works:

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}

And these are the prerequisites for a drain which are part of the writeOrBuffer function:

var ret = state.length < state.highWaterMark;
state.needDrain = !ret;

To see how the drain event is used, take the example from the Node.js documentation.

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

The function's objective is to write 1,000,000 times to a writable stream. What happens is a variable ok is set to true, and a loop only executes when ok is true. For each loop iteration, the value of ok is set to the value of stream.write(), which will return false if a drain is required. If ok becomes false, then the event handler for drain waits, and on fire, resumes the writing.


Regarding your code specifically, you don't need to use the drain event because you are writing only once right after opening your stream. Since you have not yet written anything to the stream, the internal buffer is empty, and you would have to be writing at least 16KB in chunks in order for the drain event to fire. The drain event is for writing many times with more data than the highWaterMark setting of your writable stream.

Wildlife answered 21/9, 2013 at 14:35 Comment(7)
i'm doing more writes for same streams again and again .For example i have mentioned my code like that.my data value may be huge sometimes.Sponsor
In your example you write once. Please show how you're actually writing things to the stream because writing a string once will never need the drain event.Wildlife
updated my question.b4 writing to the streams i'm checking the stream if it exists or not.if it exists means i'm writing on that.otherwise i will create new stream.Sponsor
In the example you're still only writing to the stream one time. Please show how you are going to writing to the stream multiple times to establish the context that drain can be used in.Wildlife
How can I reproduce writeOneMillionTimes? Looks like it is not the code that would work after I paste it in my editor. It does not provide how writer is created.Gendarmerie
@Gendarmerie The writer is any instance of a writable stream. It can be inferred from how it must have a write method, and a drain event.Wildlife
@Wildlife I couldn't find function once() in Node.js Stream API documents, [web]: nodejs.org/api/stream.html Can I regard it as a on() function?Sixty
F
15

Imagine you're connecting 2 streams with very different bandwidths, say, uploading a local file to a slow server. The (fast) file stream will emit data faster than the (slow) socket stream can consume it.

In this situation, node.js will keep data in memory until the slow stream gets a chance to process it. This can get problematic if the file is very large.

To avoid this, Stream.write returns false when the underlying system buffer is full. If you stop writing, the stream will later emit a drain event to indicate that the system buffer has emptied and it is appropriate to write again.

You can use pause/resume the readable stream and control the bandwidth of the readable stream.

Better: you can use readable.pipe(writable) which will do this for you.

EDIT: There's a bug in your code: regardless of what write returns, your data has been written. You don't need to retry it. In your case, you're writing data twice.

Something like this would work:

var packets = […],
    current = -1;

function niceWrite() {
  current += 1;

  if (current === packets.length)
    return stream.end();

  var nextPacket = packets[current],
      canContinue = stream.write(nextPacket);

  // wait until stream drains to continue
  if (!canContinue)
    stream.once('drain', niceWrite);
  else
    niceWrite();
}
Fungosity answered 21/9, 2013 at 12:47 Comment(2)
In stream.once('drain', niceWrite); can we replace, once with on?Gangplank
Will this recursion introduce stack overflow if packets is very long?Keeley
C
6

Here is a version with async/await

const write = (writer, data) => {
  return new Promise((resolve) => {
    if (!writer.write(data)) {
      writer.once('drain', resolve)
    }
    else {
      resolve()
    }
  })
}

// usage
const run = async () => {
  const write_stream = fs.createWriteStream('...')
  const max = 1000000
  let current = 0
  while (current <= max) {
    await write(write_stream, current++)
  }
}

https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73

Crass answered 19/3, 2019 at 15:33 Comment(0)
C
2

This is a speed-optimized version using Promises (async/await). The caller has to check if it gets a promise back and only in that case await has to be called. Doing await on each call can slow down the program by a factor of 3...

const write = (writer, data) => {
    // return a promise only when we get a drain
    if (!writer.write(data)) {
        return new Promise((resolve) => {
            writer.once('drain', resolve)
        })
    }
}

// usage
const run = async () => {
    const write_stream = fs.createWriteStream('...')
    const max = 1000000
    let current = 0
    while (current <= max) {
        const promise = write(write_stream, current++)
        // since drain happens rarely, awaiting each write call is really slow.
        if (promise) {
            // we got a drain event, therefore we wait
            await promise
        }
    }
}
Clarkia answered 4/7, 2019 at 16:57 Comment(2)
I'm getting this error: MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 drain listeners added. Use emitter.setMaxListeners() to increase limit. I'm calling write in a loop. Any suggestions to fix?Zoon
@Zoon your loop must be async and await the promise returned by write.Olpe

© 2022 - 2024 — McMap. All rights reserved.