How to stream MongoDB Query Results with nodejs?
Asked Answered
A

5

34

I have been searching for an example of how I can stream the result of a MongoDB query to a nodejs client. All solutions I have found so far seem to read the query result at once and then send the result back to the server.

Instead, I would (obviously) like to supply a callback to the query method and have MongoDB call that when the next chunk of the result set is available.

I have been looking at mongoose - should I probably use a different driver?

Jan

Adi answered 10/9, 2011 at 15:13 Comment(0)
M
33

Streaming in Mongoose became available in version 2.4.0 which appeared three months after you've posted this question:

Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);

More elaborated examples can be found on their documentation page.

Maupassant answered 29/2, 2012 at 13:24 Comment(2)
Mongoose: Query.prototype.stream() is deprecated in mongoose >= 4.5.0, use Query.prototype.cursor() insteadClaireclairobscure
Can i execute 4 queries using streams and write in the same writableStream?Igor
L
40

node-mongodb-driver (the underlying layer that every mongoDB client uses in nodejs) except the cursor API that others mentioned has a nice stream API (#458). Unfortunately i did not find it documented elsewhere.

Update: there are docs.

It can be used like this:

var stream = collection.find().stream()
stream.on('error', function (err) {
  console.error(err)
})
stream.on('data', function (doc) {
  console.log(doc)
})

It actually implements the ReadableStream interface, so it has all the goodies (pause/resume etc)

Leralerch answered 10/6, 2012 at 22:27 Comment(3)
I found documentation to what @Dan Milon is referring to at mongodb.github.com website. here it is CusrsorStreamRhapsodize
Didn't even know this existed! Thanks! mongodb.github.com/node-mongodb-native that is for the matter.Leralerch
The documentation is now at docs.mongodb.com/drivers/node/current/fundamentals/crud/… - I've updated the post to this effect.Exceptionable
M
33

Streaming in Mongoose became available in version 2.4.0 which appeared three months after you've posted this question:

Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);

More elaborated examples can be found on their documentation page.

Maupassant answered 29/2, 2012 at 13:24 Comment(2)
Mongoose: Query.prototype.stream() is deprecated in mongoose >= 4.5.0, use Query.prototype.cursor() insteadClaireclairobscure
Can i execute 4 queries using streams and write in the same writableStream?Igor
D
11

mongoose is not really "driver", it's actually an ORM wrapper around the MongoDB driver (node-mongodb-native).

To do what you're doing, take a look at the driver's .find and .each method. Here's some code from the examples:

// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
  sys.puts("Printing docs from Cursor Each")
  cursor.each(function(err, doc) {
    if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
  })                    
});

To stream the results, you're basically replacing that sys.puts with your "stream" function. Not sure how you plan to stream the results. I think you can do response.write() + response.flush(), but you may also want to checkout socket.io.

Defaulter answered 11/9, 2011 at 6:21 Comment(6)
Thanks - the driver issue I found out about yesterday. The find/cursor solution I was expecting, but it is surprisingly hard to find examples. Most do a find and then docs.foreach( ... )Adi
Update: Actually, I did not get this to work in the way you describe. What I had to do is to create an EventEmitter to glue the response stream and the data stream from the backend together.Adi
You're right about the examples, best you can hope for is the "examples" folder in the source code. EventEmitter also sounds correct. If you have a good example, we can definitely update this answer with something more detailed.Defaulter
Ok, edited your answer to point to mine so I can accept yoursAdi
@JanAlgermissen Do you have a link to a proof-of-concept? That would be handy.Paid
@Paid No, was just a pet project. Basically, I implemented the code below (in my answer).Adi
A
4

Here is the solution I found (please correct me anyone if thatis the wrong way to do it): (Also excuse the bad coding - too late for me now to prettify this)

var sys = require('sys')
var http = require("http");

var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
  Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
  Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
  Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;

var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));

var products;

db.open(function (error, client) {
  if (error) throw error;
  products = new Collection(client, 'products');
});

function ProductReader(collection) {
        this.collection = collection;
}

ProductReader.prototype = new process.EventEmitter();

ProductReader.prototype.do = function() {
        var self = this;

        this.collection.find(function(err, cursor) {
                if (err) {
                        self.emit('e1');
                        return;

                }
                sys.puts("Printing docs from Cursor Each");

                self.emit('start');
                cursor.each(function(err, doc) {
                        if (!err) {
                                self.emit('e2');
                                self.emit('end');
                                return;
                        }

                        if(doc != null) {
                                sys.puts("doc:" + doc.name);
                                self.emit('doc',doc);
                        } else {
                                self.emit('end');
                        }
                })
        });
};
http.createServer(function(req,res){
        pr = new ProductReader(products);
        pr.on('e1',function(){
                sys.puts("E1");
                res.writeHead(400,{"Content-Type": "text/plain"});
                res.write("e1 occurred\n");
                res.end();
        });
        pr.on('e2',function(){
                sys.puts("E2");
                res.write("ERROR\n");
        });

        pr.on('start',function(){
                sys.puts("START");
                res.writeHead(200,{"Content-Type": "text/plain"});
                res.write("<products>\n");
        });

        pr.on('doc',function(doc){
                sys.puts("A DOCUMENT" + doc.name);
                res.write("<product><name>" + doc.name + "</name></product>\n");
        });

        pr.on('end',function(){
                sys.puts("END");
                res.write("</products>");
                res.end();
        });

        pr.do();

  }).listen(8000);
Adi answered 11/9, 2011 at 20:56 Comment(0)
C
0

I have been studying mongodb streams myself, while I do not have the entire answer you are looking for, I do have part of it. you can setup a socket.io stream

this is using javascript socket.io and socket.io-streaming available at NPM also mongodb for the database because using a 40 year old database that has issues is incorrect, time to modernize also the 40 year old db is SQL and SQL doesn't do streams to my knowledge

So although you only asked about data going from server to client, I also want to get client to server in my answer because I can NEVER find it anywhere when I search and I wanted to setup one place with both the send and receive elements via stream so everyone could get the hang of it quickly.

client side sending data to server via streaming

stream = ss.createStream();
blobstream=ss.createBlobReadStream(data);
blobstream.pipe(stream);
ss(socket).emit('data.stream',stream,{},function(err,successful_db_insert_id){
 //if you get back the id it went into the db and everything worked
});

server receiving stream from the client side and then replying when done

ss(socket).on('data.stream.out',function(stream,o,c){
 buffer=[];
 stream.on('data',function(chunk){buffer.push(chunk);});
 stream.on('end',function(){
  buffer=Buffer.concat(buffer);
  db.insert(buffer,function(err,res){
   res=insertedId[0];
   c(null,res);
  });
 });
});

//This is the other half of that the fetching of data and streaming to the client

client side requesting and receiving stream data from server

stream=ss.createStream();
binarystring='';
stream.on('data',function(chunk){ 
 for(var I=0;i<chunk.length;i++){
  binarystring+=String.fromCharCode(chunk[i]); 
 }
});
stream.on('end',function(){ data=window.btoa(binarystring); c(null,data); });
ss(socket).emit('data.stream.get,stream,o,c);

server side replying to request for streaming data

ss(socket).on('data.stream.get',function(stream,o,c){
 stream.on('end',function(){
  c(null,true);
 });
 db.find().stream().pipe(stream);
});

The very last one there is the only one where I am kind of just pulling it out of my butt because I have not yet tried it, but that should work. I actually do something similar but I write the file to the hard drive then use fs.createReadStream to stream it to the client. So not sure if 100% but from what I read it should be, I'll get back to you once I test it.

P.s. anyone want to bug me about my colloquial way of talking, I'm Canadian, and I love saying "eh" come at me with your hugs and hits bros/sis' :D

Capability answered 8/7, 2018 at 16:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.