Node.js: How to read a stream into a buffer?
Asked Answered
C

12

120

I wrote a pretty simple function that downloads an image from a given URL, resize it and upload to S3 (using 'gm' and 'knox'), I have no idea if I'm doing the reading of a stream to a buffer correctly. (everything is working, but is it the correct way?)

also, I want to understand something about the event loop, how do I know that one invocation of the function won't leak anything or change the 'buf' variable to another already running invocation (or this scenario is impossible because the callbacks are anonymous functions?)

var http = require('http');
var https = require('https');
var s3 = require('./s3');
var gm = require('gm');

module.exports.processImageUrl = function(imageUrl, filename, callback) {
var client = http;
if (imageUrl.substr(0, 5) == 'https') { client = https; }

client.get(imageUrl, function(res) {
    if (res.statusCode != 200) {
        return callback(new Error('HTTP Response code ' + res.statusCode));
    }

    gm(res)
        .geometry(1024, 768, '>')
        .stream('jpg', function(err, stdout, stderr) {
            if (!err) {
                var buf = new Buffer(0);
                stdout.on('data', function(d) {
                    buf = Buffer.concat([buf, d]);
                });

                stdout.on('end', function() {
                    var headers = {
                        'Content-Length': buf.length
                        , 'Content-Type': 'Image/jpeg'
                        , 'x-amz-acl': 'public-read'
                    };

                    s3.putBuffer(buf, '/img/d/' + filename + '.jpg', headers, function(err, res) {
                        if(err) {
                            return callback(err);
                        } else {
                            return callback(null, res.client._httpMessage.url);
                        }
                    });
                });
            } else {
                callback(err);
            }
        });
    }).on('error', function(err) {
        callback(err);
    });
};
Command answered 10/1, 2013 at 23:34 Comment(3)
These answers can't be real, can they? Where is the standard library function for this?Vacationist
@Vacationist Here is the library function for that.Bluster
@Bluster nice, that's much better!Vacationist
P
109

Overall I don't see anything that would break in your code.

Two suggestions:

The way you are combining Buffer objects is a suboptimal because it has to copy all the pre-existing data on every 'data' event. It would be better to put the chunks in an array and concat them all at the end.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
})

For performance, I would look into if the S3 library you are using supports streams. Ideally you wouldn't need to create one large buffer at all, and instead just pass the stdout stream directly to the S3 library.

As for the second part of your question, that isn't possible. When a function is called, it is allocated its own private context, and everything defined inside of that will only be accessible from other items defined inside that function.

Update

Dumping the file to the filesystem would probably mean less memory usage per request, but file IO can be pretty slow so it might not be worth it. I'd say that you shouldn't optimize too much until you can profile and stress-test this function. If the garbage collector is doing its job you may be overoptimizing.

With all that said, there are better ways anyway, so don't use files. Since all you want is the length, you can calculate that without needing to append all of the buffers together, so then you don't need to allocate a new Buffer at all.

var pause_stream = require('pause-stream');

// Your other code.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var contentLength = bufs.reduce(function(sum, buf){
    return sum + buf.length;
  }, 0);

  // Create a stream that will emit your chunks when resumed.
  var stream = pause_stream();
  stream.pause();
  while (bufs.length) stream.write(bufs.shift());
  stream.end();

  var headers = {
      'Content-Length': contentLength,
      // ...
  };

  s3.putStream(stream, ....);
Paterfamilias answered 11/1, 2013 at 0:5 Comment(6)
it supports streams, but I need to know the Content-Length for the S3 headers and its impossible with streamsCommand
btw - what about the second part of the question ?Command
is it a better practice to pipe the stream from 'gm' to a file and then open a stream from that file and upload to S3, using the file size as Content-Length ? as far as I understand this eliminates loading the entire file to memory like I'm doing nowCommand
just want to mention that the bufs.pop() call should be bufs.unshift(), or even easier just replace the entire while loop with a simple for loop.Desiccate
on the on('data') you can just do bytes += data.length instead of reducing the array at the end.Anastos
@Anastos True, but then you have to maintain two separate accumulator variables. I prefer maintaining the single one and calculating the length later. I'm not convinced it would make an appreciable difference in performance or anything.Paterfamilias
S
75

Javascript snippet

function stream2buffer(stream) {

    return new Promise((resolve, reject) => {
        
        const _buf = [];

        stream.on("data", (chunk) => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", (err) => reject(err));

    });
} 

Typescript snippet

async function stream2buffer(stream: Stream): Promise<Buffer> {

    return new Promise < Buffer > ((resolve, reject) => {
        
        const _buf = Array < any > ();

        stream.on("data", chunk => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", err => reject(`error converting stream - ${err}`));

    });
} 
Sulfate answered 27/5, 2021 at 20:38 Comment(7)
this works incredibly well... this is the MVP right hereSwick
Actually for me JS version of stream2buffer() doesn't return a proper value.Watermelon
Hi @MikeB. as you see the code is very simple (also to debug) Could you give us mode details concerning : "doesn't return a proper value"Sulfate
@bsorrentino, I think the problem was with the way I return the value. In my case const pdfBuffered = `data:application/pdf;base64, ${Buffer.concat(chunks).toString("base64")}`; worked OK. So, not just Buffer.concat(_buf)) but Buffer.concat(chunks).toString("base64").Watermelon
Excellent. Needed to convert a graphql-upload ReadStream to a buffer to process an image. This worked wonderfully!Physiography
Unbelievable that this isn't in the stdlibVacationist
I'm still not convinced that this is the best way to do this, because like others I'm shocked this isn't in the stdlib, but it's short and effective, so it gets my voteAlkalosis
P
33

Note: this solely answers "How to read a stream into a buffer?" and ignores the context of the original question.

ES2018 Answer

Since Node 11.14.0, readable streams support async iterators.

const buffers = [];

// node.js readable streams implement the async iterator protocol
for await (const data of readableStream) {
  buffers.push(data);
}

const finalBuffer = Buffer.concat(buffers);

Bonus: In the future, this could get better with the stage 2 stage 3 Array.fromAsync proposal.

// 🛑 DOES NOT WORK (yet!)
const finalBuffer = Buffer.concat(await Array.fromAsync(readableStream));
Profluent answered 7/7, 2022 at 0:28 Comment(3)
Would using the iterator be preferred over the eventing proposed in the other answers? I had actually gone the iterator route before reviewing other suggestions, and then found this SO question where most of the answers suggest going the evening route. What do you think?Rupe
yeah there's nothing about this approach that would be worse than the events. most of those answer were written prior to this feature existing. i'd say this is very idiomatic to the language however many JS devs still don't know about it so the best approach is probably one your team understands the mostProfluent
function dies on / after first chunk, no response, no error... what could it be? any help will be appreciatedStocky
P
9

You can easily do this using node-fetch if you are pulling from http(s) URIs.

From the readme:

fetch('https://assets-cdn.github.com/images/modules/logos_page/Octocat.png')
    .then(res => res.buffer())
    .then(buffer => console.log)
Parthinia answered 5/2, 2018 at 12:31 Comment(2)
You can also abuse Response from node-fetch to get a buffer from any stream not just http: new Response(stream).buffer().Rubetta
Response.buffer is not a function. So um... what? Edit: Response.arrayBuffer seems to workAmon
C
6

You can convert your readable stream to a buffer and integrate it in your code in an asynchronous way like this.

async streamToBuffer (stream) {
    return new Promise((resolve, reject) => {
      const data = [];

      stream.on('data', (chunk) => {
        data.push(chunk);
      });

      stream.on('end', () => {
        resolve(Buffer.concat(data))
      })

      stream.on('error', (err) => {
        reject(err)
      })
   
    })
  }

the usage would be as simple as:

 // usage
  const myStream // your stream
  const buffer = await streamToBuffer(myStream) // this is a buffer
Coach answered 19/1, 2021 at 14:28 Comment(1)
Doesn't work with process.stdin on Windows - if no input has been piped into the command, neither end nor error appears to fire.Maxima
B
6

Node.js: How to read a stream into a buffer?

import { buffer } from 'node:stream/consumers';

const buf = await buffer(stream); // <-- Use your stream here!

or in case you are still using CommonJS modules:

const { buffer } = require('node:stream/consumers');   

const buf = await buffer(stream); // <-- Use your stream here!

Note: Utility Consumers were added in: v16.7.0.


Longer answer

As of March 2024, utility consumers provide the following functions to consume streams: arrayBuffer, blob, buffer, json, text. Here is a code example.

The package.json file next to my index.js script:

{
    "type": "module"
}

My index.js file:

import { buffer, text, json } from 'node:stream/consumers'
import { createReadStream } from 'node:fs'

let stream

stream = createReadStream('./package.json')
const buf = await buffer(stream)
console.log('As Buffer:')
console.log(buf)
console.log()

stream = createReadStream('./package.json')
const obj = await json(stream)
console.log('As Object:')
console.log(obj)
console.log()

stream = createReadStream('./package.json')
const str = await text(stream)
console.log('As String:')
console.log(str)

When executed as node ./index.js, this code prints:

As Buffer:
<Buffer 7b 0a 20 20 20 20 22 74 79 70 65 22 3a 20 22 6d 6f 64 75 6c 65 22 0a 7d>

As Object:
{ type: 'module' }

As String:
{
    "type": "module"
}
Bluster answered 1/3 at 12:59 Comment(1)
This was the answer I was waiting for =)Stoned
C
4

I suggest loganfsmyths method, using an array to hold the data.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
}

IN my current working example, i am working with GRIDfs and npm's Jimp.

   var bucket = new GridFSBucket(getDBReference(), { bucketName: 'images' } );
    var dwnldStream = bucket.openDownloadStream(info[0]._id);// original size
  dwnldStream.on('data', function(chunk) {
       data.push(chunk);
    });
  dwnldStream.on('end', function() {
    var buff =Buffer.concat(data);
    console.log("buffer: ", buff);
       jimp.read(buff)
.then(image => {
         console.log("read the image!");
         IMAGE_SIZES.forEach( (size)=>{
         resize(image,size);
         });
});

I did some other research

with a string method but that did not work, per haps because i was reading from an image file, but the array method did work.

const DISCLAIMER = "DONT DO THIS";
var data = "";
stdout.on('data', function(d){ 
           bufs+=d; 
         });
stdout.on('end', function(){
          var buf = Buffer.from(bufs);
          //// do work with the buffer here

          });

When i did the string method i got this error from npm jimp

buffer:  <Buffer 00 00 00 00 00>
{ Error: Could not find MIME for Buffer <null>

basically i think the type coersion from binary to string didnt work so well.

Church answered 3/6, 2019 at 19:53 Comment(0)
L
3

You can do this by:

async function toBuffer(stream: ReadableStream<Uint8Array>) {
  const list = []
  const reader = stream.getReader()
  while (true) {
    const { value, done } = await reader.read()
    if (value)
      list.push(value)
    if (done)
      break
  }
  return Buffer.concat(list)
}

or using buffer consumer

const buf = buffer(stream)
Ladida answered 13/11, 2022 at 13:42 Comment(2)
Are you sure this works? I don't think this will read the entire stream. "The read() method of the ReadableStreamDefaultReader interface returns a Promise providing access to the next chunk in the stream's internal queue." #14269733Thermodynamic
@Thermodynamic I have updated my answer to concat all chunks. Thanks.Ladida
F
1

I suggest to have array of buffers and concat to resulting buffer only once at the end. Its easy to do manually, or one could use node-buffers

Frenchify answered 11/1, 2013 at 0:3 Comment(0)
S
1

I just want to post my solution. Previous answers was pretty helpful for my research. I use length-stream to get the size of the stream, but the problem here is that the callback is fired near the end of the stream, so i also use stream-cache to cache the stream and pipe it to res object once i know the content-length. In case on an error,

var StreamCache = require('stream-cache');
var lengthStream = require('length-stream');

var _streamFile = function(res , stream , cb){
    var cache = new StreamCache();

    var lstream = lengthStream(function(length) {
        res.header("Content-Length", length);
        cache.pipe(res);
    });

    stream.on('error', function(err){
        return cb(err);
    });

    stream.on('end', function(){
        return cb(null , true);
    });

    return stream.pipe(lstream).pipe(cache);
}
Sharma answered 29/10, 2014 at 9:16 Comment(0)
Z
1

in ts, [].push(bufferPart) is not compatible;

so:

getBufferFromStream(stream: Part | null): Promise<Buffer> {
    if (!stream) {
        throw 'FILE_STREAM_EMPTY';
    }
    return new Promise(
        (r, j) => {
            let buffer = Buffer.from([]);
            stream.on('data', buf => {
               buffer = Buffer.concat([buffer, buf]);
            });
            stream.on('end', () => r(buffer));
            stream.on('error', j);
        }
    );
}
Zincography answered 22/5, 2020 at 8:54 Comment(0)
V
0

You can check the "content-length" header at res.headers. It will give you the length of the content you will receive (how many bytes of data it will send)

Viveca answered 25/4, 2021 at 17:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.