Proper way to consume NodeJS stream into buffer and write stream
Asked Answered
L

1

6

I have a need to pipe a readable stream into both a buffer (to be converted into a string) and a file. The stream is coming from node-fetch.

NodeJS streams have two states: paused and flowing. From what I understand, as soon as a 'data' listener is attached, the stream will change to flowing mode. I want to make sure the way I am reading a stream will not lose any bytes.

Method 1: piping and reading from 'data':

fetch(url).then(
  response =>
    new Promise(resolve => {
      const buffers = []
      const dest = fs.createWriteStream(filename)
      response.body.pipe(dest)
      response.body.on('data', chunk => buffers.push(chunk))
      dest.on('close', () => resolve(Buffer.concat(buffers).toString())
    })
)

Method 2: using passthrough streams:

const { PassThrough } = require('stream')
fetch(url).then(
  response =>
    new Promise(resolve => {
      const buffers = []
      const dest = fs.createWriteStream(filename)
      const forFile = new PassThrough()
      const forBuffer = new PassThrough()
      response.body.pipe(forFile).pipe(dest)
      response.body.pipe(forBuffer)
      forBuffer.on('data', chunk => buffers.push(chunk))
      dest.on('close', () => resolve(Buffer.concat(buffers).toString())
    })
)

Is the second method required so there is no lost data? Is the second method wasteful since two more streams could be buffered? Or, is there another way to fill a buffer and write stream simultaneously?

Luzern answered 16/7, 2018 at 23:47 Comment(2)
It is actually working as expected, I was just not sure if I was lucky enough with a fast write stream. I have read a few posts saying that write steams will try to call a method read at their own pace (pulling), and I have also read that the 'data' listener will cause the steam to be flowing constantly (pushing)Luzern
You could just use fs.writeFile() if you're just going to read the whole file into memory first. There is no need to .pipe() it. If you don't need the whole file in memory, then .pipe() is more efficient.Foremast
I
2

You won't miss any data, since .pipe internally calls src.on('data') and writes any chunk to the target stream.

So any chunk written to your dest stream, will also be emitted to response.body.on('data') where you're buffering the chunks. In any case, you should listen to 'error' events and reject if any error occurs.

And While your second mode will work, you don't need it.


This is a chunk of code from the .pipe function

  src.on('data', ondata);
  function ondata(chunk) {
    debug('ondata');
    var ret = dest.write(chunk);
    debug('dest.write', ret);
    if (ret === false) {
      // If the user unpiped during `dest.write()`, it is possible
      // to get stuck in a permanently paused state if that write
      // also returned false.
      // => Check whether `dest` is still a piping destination.
      if (((state.pipesCount === 1 && state.pipes === dest) ||
           (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
          !cleanedUp) {
        debug('false write response, pause', state.awaitDrain);
        state.awaitDrain++;
      }
      src.pause();
    }
  }
Iodic answered 17/7, 2018 at 0:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.